diff options
Diffstat (limited to 'weed/topology/volume_layout.go')
| -rw-r--r-- | weed/topology/volume_layout.go | 181 |
1 files changed, 152 insertions, 29 deletions
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 7633b28be..c7e171248 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -3,6 +3,7 @@ package topology import ( "errors" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "math/rand" "sync" "time" @@ -13,15 +14,103 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/super_block" ) +type copyState int + +const ( + noCopies copyState = 0 + iota + insufficientCopies + enoughCopies +) + +type volumeState string + +const ( + readOnlyState volumeState = "ReadOnly" + oversizedState = "Oversized" +) + +type stateIndicator func(copyState) bool + +func ExistCopies() stateIndicator { + return func(state copyState) bool { return state != noCopies } +} + +func NoCopies() stateIndicator { + return func(state copyState) bool { return state == noCopies } +} + +type volumesBinaryState struct { + rp *super_block.ReplicaPlacement + name volumeState // the name for volume state (eg. "Readonly", "Oversized") + indicator stateIndicator // indicate whether the volumes should be marked as `name` + copyMap map[needle.VolumeId]*VolumeLocationList +} + +func NewVolumesBinaryState(name volumeState, rp *super_block.ReplicaPlacement, indicator stateIndicator) *volumesBinaryState { + return &volumesBinaryState{ + rp: rp, + name: name, + indicator: indicator, + copyMap: make(map[needle.VolumeId]*VolumeLocationList), + } +} + +func (v *volumesBinaryState) Dump() (res []uint32) { + for vid, list := range v.copyMap { + if v.indicator(v.copyState(list)) { + res = append(res, uint32(vid)) + } + } + return +} + +func (v *volumesBinaryState) IsTrue(vid needle.VolumeId) bool { + list, _ := v.copyMap[vid] + return v.indicator(v.copyState(list)) +} + +func (v *volumesBinaryState) Add(vid needle.VolumeId, dn *DataNode) { + list, _ := v.copyMap[vid] + if list != nil { + list.Set(dn) + return + } + list = NewVolumeLocationList() + list.Set(dn) + v.copyMap[vid] = list +} + +func (v *volumesBinaryState) Remove(vid needle.VolumeId, dn *DataNode) { + list, _ := v.copyMap[vid] + if list != nil { + list.Remove(dn) + if list.Length() == 0 { + delete(v.copyMap, vid) + } + } +} + +func (v *volumesBinaryState) copyState(list *VolumeLocationList) copyState { + if list == nil { + return noCopies + } + if list.Length() < v.rp.GetCopyCount() { + return insufficientCopies + } + return enoughCopies +} + // mapping from volume to its locations, inverted from server to volume type VolumeLayout struct { rp *super_block.ReplicaPlacement ttl *needle.TTL + diskType types.DiskType vid2location map[needle.VolumeId]*VolumeLocationList - writables []needle.VolumeId // transient array of writable volume id - readonlyVolumes map[needle.VolumeId]bool // transient set of readonly volumes - oversizedVolumes map[needle.VolumeId]bool // set of oversized volumes + writables []needle.VolumeId // transient array of writable volume id + readonlyVolumes *volumesBinaryState // readonly volumes + oversizedVolumes *volumesBinaryState // oversized volumes volumeSizeLimit uint64 + replicationAsMin bool accessLock sync.RWMutex } @@ -31,19 +120,23 @@ type VolumeLayoutStats struct { FileCount uint64 } -func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64) *VolumeLayout { +func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType, volumeSizeLimit uint64, replicationAsMin bool) *VolumeLayout { return &VolumeLayout{ rp: rp, ttl: ttl, + diskType: diskType, vid2location: make(map[needle.VolumeId]*VolumeLocationList), writables: *new([]needle.VolumeId), - readonlyVolumes: make(map[needle.VolumeId]bool), - oversizedVolumes: make(map[needle.VolumeId]bool), + readonlyVolumes: NewVolumesBinaryState(readOnlyState, rp, ExistCopies()), + oversizedVolumes: NewVolumesBinaryState(oversizedState, rp, ExistCopies()), volumeSizeLimit: volumeSizeLimit, + replicationAsMin: replicationAsMin, } } func (vl *VolumeLayout) String() string { + vl.accessLock.RLock() + defer vl.accessLock.RUnlock() return fmt.Sprintf("rp:%v, ttl:%v, vid2location:%v, writables:%v, volumeSizeLimit:%v", vl.rp, vl.ttl, vl.vid2location, vl.writables, vl.volumeSizeLimit) } @@ -51,6 +144,8 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.accessLock.Lock() defer vl.accessLock.Unlock() + defer vl.rememberOversizedVolume(v, dn) + if _, ok := vl.vid2location[v.Id]; !ok { vl.vid2location[v.Id] = NewVolumeLocationList() } @@ -61,27 +156,26 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { if vInfo.ReadOnly { glog.V(1).Infof("vid %d removed from writable", v.Id) vl.removeFromWritable(v.Id) - vl.readonlyVolumes[v.Id] = true + vl.readonlyVolumes.Add(v.Id, dn) return } else { - delete(vl.readonlyVolumes, v.Id) + vl.readonlyVolumes.Remove(v.Id, dn) } } else { glog.V(1).Infof("vid %d removed from writable", v.Id) vl.removeFromWritable(v.Id) - delete(vl.readonlyVolumes, v.Id) + vl.readonlyVolumes.Remove(v.Id, dn) return } } - vl.rememberOversizedVolume(v) - vl.ensureCorrectWritables(v) - } -func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo) { +func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo, dn *DataNode) { if vl.isOversized(v) { - vl.oversizedVolumes[v.Id] = true + vl.oversizedVolumes.Add(v.Id, dn) + } else { + vl.oversizedVolumes.Remove(v.Id, dn) } } @@ -97,7 +191,9 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { if location.Remove(dn) { - vl.ensureCorrectWritables(v) + vl.readonlyVolumes.Remove(v.Id, dn) + vl.oversizedVolumes.Remove(v.Id, dn) + vl.ensureCorrectWritables(v.Id) if location.Length() == 0 { delete(vl.vid2location, v.Id) @@ -106,23 +202,32 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { } } -func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) { - if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { - if _, ok := vl.oversizedVolumes[v.Id]; !ok { - vl.addToWritable(v.Id) +func (vl *VolumeLayout) EnsureCorrectWritables(v *storage.VolumeInfo) { + vl.accessLock.Lock() + defer vl.accessLock.Unlock() + + vl.ensureCorrectWritables(v.Id) +} + +func (vl *VolumeLayout) ensureCorrectWritables(vid needle.VolumeId) { + if vl.enoughCopies(vid) && vl.isAllWritable(vid) { + if !vl.oversizedVolumes.IsTrue(vid) { + vl.setVolumeWritable(vid) } } else { - vl.removeFromWritable(v.Id) + vl.removeFromWritable(vid) } } -func (vl *VolumeLayout) addToWritable(vid needle.VolumeId) { - for _, id := range vl.writables { - if vid == id { - return +func (vl *VolumeLayout) isAllWritable(vid needle.VolumeId) bool { + for _, dn := range vl.vid2location[vid].list { + if v, getError := dn.GetVolumesById(vid); getError == nil { + if v.ReadOnly { + return false + } } } - vl.writables = append(vl.writables, vid) + return true } func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool { @@ -258,6 +363,8 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid needle.VolumeId) if location, ok := vl.vid2location[vid]; ok { if location.Remove(dn) { + vl.readonlyVolumes.Remove(vid, dn) + vl.oversizedVolumes.Remove(vid, dn) if location.Length() < vl.rp.GetCopyCount() { glog.V(0).Infoln("Volume", vid, "has", location.Length(), "replica, less than required", vl.rp.GetCopyCount()) return vl.removeFromWritable(vid) @@ -266,17 +373,33 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid needle.VolumeId) } return false } -func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId) bool { +func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId, isReadOnly bool) bool { vl.accessLock.Lock() defer vl.accessLock.Unlock() + vInfo, err := dn.GetVolumesById(vid) + if err != nil { + return false + } + vl.vid2location[vid].Set(dn) - if vl.vid2location[vid].Length() == vl.rp.GetCopyCount() { + + if vInfo.ReadOnly || isReadOnly { + return false + } + + if vl.enoughCopies(vid) { return vl.setVolumeWritable(vid) } return false } +func (vl *VolumeLayout) enoughCopies(vid needle.VolumeId) bool { + locations := vl.vid2location[vid].Length() + desired := vl.rp.GetCopyCount() + return locations == desired || (vl.replicationAsMin && locations > desired) +} + func (vl *VolumeLayout) SetVolumeCapacityFull(vid needle.VolumeId) bool { vl.accessLock.Lock() defer vl.accessLock.Unlock() @@ -306,10 +429,10 @@ func (vl *VolumeLayout) Stats() *VolumeLayoutStats { size, fileCount := vll.Stats(vid, freshThreshold) ret.FileCount += uint64(fileCount) ret.UsedSize += size - if vl.readonlyVolumes[vid] { + if vl.readonlyVolumes.IsTrue(vid) { ret.TotalSize += size } else { - ret.TotalSize += vl.volumeSizeLimit + ret.TotalSize += vl.volumeSizeLimit * uint64(vll.Length()) } } |
