aboutsummaryrefslogtreecommitdiff
path: root/weed/topology/volume_layout.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology/volume_layout.go')
-rw-r--r--weed/topology/volume_layout.go181
1 files changed, 152 insertions, 29 deletions
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index 7633b28be..c7e171248 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -3,6 +3,7 @@ package topology
import (
"errors"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
"math/rand"
"sync"
"time"
@@ -13,15 +14,103 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
+type copyState int
+
+const (
+ noCopies copyState = 0 + iota
+ insufficientCopies
+ enoughCopies
+)
+
+type volumeState string
+
+const (
+ readOnlyState volumeState = "ReadOnly"
+ oversizedState = "Oversized"
+)
+
+type stateIndicator func(copyState) bool
+
+func ExistCopies() stateIndicator {
+ return func(state copyState) bool { return state != noCopies }
+}
+
+func NoCopies() stateIndicator {
+ return func(state copyState) bool { return state == noCopies }
+}
+
+type volumesBinaryState struct {
+ rp *super_block.ReplicaPlacement
+ name volumeState // the name for volume state (eg. "Readonly", "Oversized")
+ indicator stateIndicator // indicate whether the volumes should be marked as `name`
+ copyMap map[needle.VolumeId]*VolumeLocationList
+}
+
+func NewVolumesBinaryState(name volumeState, rp *super_block.ReplicaPlacement, indicator stateIndicator) *volumesBinaryState {
+ return &volumesBinaryState{
+ rp: rp,
+ name: name,
+ indicator: indicator,
+ copyMap: make(map[needle.VolumeId]*VolumeLocationList),
+ }
+}
+
+func (v *volumesBinaryState) Dump() (res []uint32) {
+ for vid, list := range v.copyMap {
+ if v.indicator(v.copyState(list)) {
+ res = append(res, uint32(vid))
+ }
+ }
+ return
+}
+
+func (v *volumesBinaryState) IsTrue(vid needle.VolumeId) bool {
+ list, _ := v.copyMap[vid]
+ return v.indicator(v.copyState(list))
+}
+
+func (v *volumesBinaryState) Add(vid needle.VolumeId, dn *DataNode) {
+ list, _ := v.copyMap[vid]
+ if list != nil {
+ list.Set(dn)
+ return
+ }
+ list = NewVolumeLocationList()
+ list.Set(dn)
+ v.copyMap[vid] = list
+}
+
+func (v *volumesBinaryState) Remove(vid needle.VolumeId, dn *DataNode) {
+ list, _ := v.copyMap[vid]
+ if list != nil {
+ list.Remove(dn)
+ if list.Length() == 0 {
+ delete(v.copyMap, vid)
+ }
+ }
+}
+
+func (v *volumesBinaryState) copyState(list *VolumeLocationList) copyState {
+ if list == nil {
+ return noCopies
+ }
+ if list.Length() < v.rp.GetCopyCount() {
+ return insufficientCopies
+ }
+ return enoughCopies
+}
+
// mapping from volume to its locations, inverted from server to volume
type VolumeLayout struct {
rp *super_block.ReplicaPlacement
ttl *needle.TTL
+ diskType types.DiskType
vid2location map[needle.VolumeId]*VolumeLocationList
- writables []needle.VolumeId // transient array of writable volume id
- readonlyVolumes map[needle.VolumeId]bool // transient set of readonly volumes
- oversizedVolumes map[needle.VolumeId]bool // set of oversized volumes
+ writables []needle.VolumeId // transient array of writable volume id
+ readonlyVolumes *volumesBinaryState // readonly volumes
+ oversizedVolumes *volumesBinaryState // oversized volumes
volumeSizeLimit uint64
+ replicationAsMin bool
accessLock sync.RWMutex
}
@@ -31,19 +120,23 @@ type VolumeLayoutStats struct {
FileCount uint64
}
-func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64) *VolumeLayout {
+func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType, volumeSizeLimit uint64, replicationAsMin bool) *VolumeLayout {
return &VolumeLayout{
rp: rp,
ttl: ttl,
+ diskType: diskType,
vid2location: make(map[needle.VolumeId]*VolumeLocationList),
writables: *new([]needle.VolumeId),
- readonlyVolumes: make(map[needle.VolumeId]bool),
- oversizedVolumes: make(map[needle.VolumeId]bool),
+ readonlyVolumes: NewVolumesBinaryState(readOnlyState, rp, ExistCopies()),
+ oversizedVolumes: NewVolumesBinaryState(oversizedState, rp, ExistCopies()),
volumeSizeLimit: volumeSizeLimit,
+ replicationAsMin: replicationAsMin,
}
}
func (vl *VolumeLayout) String() string {
+ vl.accessLock.RLock()
+ defer vl.accessLock.RUnlock()
return fmt.Sprintf("rp:%v, ttl:%v, vid2location:%v, writables:%v, volumeSizeLimit:%v", vl.rp, vl.ttl, vl.vid2location, vl.writables, vl.volumeSizeLimit)
}
@@ -51,6 +144,8 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
+ defer vl.rememberOversizedVolume(v, dn)
+
if _, ok := vl.vid2location[v.Id]; !ok {
vl.vid2location[v.Id] = NewVolumeLocationList()
}
@@ -61,27 +156,26 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
if vInfo.ReadOnly {
glog.V(1).Infof("vid %d removed from writable", v.Id)
vl.removeFromWritable(v.Id)
- vl.readonlyVolumes[v.Id] = true
+ vl.readonlyVolumes.Add(v.Id, dn)
return
} else {
- delete(vl.readonlyVolumes, v.Id)
+ vl.readonlyVolumes.Remove(v.Id, dn)
}
} else {
glog.V(1).Infof("vid %d removed from writable", v.Id)
vl.removeFromWritable(v.Id)
- delete(vl.readonlyVolumes, v.Id)
+ vl.readonlyVolumes.Remove(v.Id, dn)
return
}
}
- vl.rememberOversizedVolume(v)
- vl.ensureCorrectWritables(v)
-
}
-func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo) {
+func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo, dn *DataNode) {
if vl.isOversized(v) {
- vl.oversizedVolumes[v.Id] = true
+ vl.oversizedVolumes.Add(v.Id, dn)
+ } else {
+ vl.oversizedVolumes.Remove(v.Id, dn)
}
}
@@ -97,7 +191,9 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
if location.Remove(dn) {
- vl.ensureCorrectWritables(v)
+ vl.readonlyVolumes.Remove(v.Id, dn)
+ vl.oversizedVolumes.Remove(v.Id, dn)
+ vl.ensureCorrectWritables(v.Id)
if location.Length() == 0 {
delete(vl.vid2location, v.Id)
@@ -106,23 +202,32 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
}
}
-func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) {
- if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
- if _, ok := vl.oversizedVolumes[v.Id]; !ok {
- vl.addToWritable(v.Id)
+func (vl *VolumeLayout) EnsureCorrectWritables(v *storage.VolumeInfo) {
+ vl.accessLock.Lock()
+ defer vl.accessLock.Unlock()
+
+ vl.ensureCorrectWritables(v.Id)
+}
+
+func (vl *VolumeLayout) ensureCorrectWritables(vid needle.VolumeId) {
+ if vl.enoughCopies(vid) && vl.isAllWritable(vid) {
+ if !vl.oversizedVolumes.IsTrue(vid) {
+ vl.setVolumeWritable(vid)
}
} else {
- vl.removeFromWritable(v.Id)
+ vl.removeFromWritable(vid)
}
}
-func (vl *VolumeLayout) addToWritable(vid needle.VolumeId) {
- for _, id := range vl.writables {
- if vid == id {
- return
+func (vl *VolumeLayout) isAllWritable(vid needle.VolumeId) bool {
+ for _, dn := range vl.vid2location[vid].list {
+ if v, getError := dn.GetVolumesById(vid); getError == nil {
+ if v.ReadOnly {
+ return false
+ }
}
}
- vl.writables = append(vl.writables, vid)
+ return true
}
func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool {
@@ -258,6 +363,8 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid needle.VolumeId)
if location, ok := vl.vid2location[vid]; ok {
if location.Remove(dn) {
+ vl.readonlyVolumes.Remove(vid, dn)
+ vl.oversizedVolumes.Remove(vid, dn)
if location.Length() < vl.rp.GetCopyCount() {
glog.V(0).Infoln("Volume", vid, "has", location.Length(), "replica, less than required", vl.rp.GetCopyCount())
return vl.removeFromWritable(vid)
@@ -266,17 +373,33 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid needle.VolumeId)
}
return false
}
-func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId) bool {
+func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId, isReadOnly bool) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
+ vInfo, err := dn.GetVolumesById(vid)
+ if err != nil {
+ return false
+ }
+
vl.vid2location[vid].Set(dn)
- if vl.vid2location[vid].Length() == vl.rp.GetCopyCount() {
+
+ if vInfo.ReadOnly || isReadOnly {
+ return false
+ }
+
+ if vl.enoughCopies(vid) {
return vl.setVolumeWritable(vid)
}
return false
}
+func (vl *VolumeLayout) enoughCopies(vid needle.VolumeId) bool {
+ locations := vl.vid2location[vid].Length()
+ desired := vl.rp.GetCopyCount()
+ return locations == desired || (vl.replicationAsMin && locations > desired)
+}
+
func (vl *VolumeLayout) SetVolumeCapacityFull(vid needle.VolumeId) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
@@ -306,10 +429,10 @@ func (vl *VolumeLayout) Stats() *VolumeLayoutStats {
size, fileCount := vll.Stats(vid, freshThreshold)
ret.FileCount += uint64(fileCount)
ret.UsedSize += size
- if vl.readonlyVolumes[vid] {
+ if vl.readonlyVolumes.IsTrue(vid) {
ret.TotalSize += size
} else {
- ret.TotalSize += vl.volumeSizeLimit
+ ret.TotalSize += vl.volumeSizeLimit * uint64(vll.Length())
}
}