aboutsummaryrefslogtreecommitdiff
path: root/weed/topology/volume_layout.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology/volume_layout.go')
-rw-r--r--weed/topology/volume_layout.go108
1 files changed, 69 insertions, 39 deletions
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index 71a071e2f..9e84fd2da 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -9,17 +9,20 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
// mapping from volume to its locations, inverted from server to volume
type VolumeLayout struct {
- rp *storage.ReplicaPlacement
- ttl *storage.TTL
- vid2location map[storage.VolumeId]*VolumeLocationList
- writables []storage.VolumeId // transient array of writable volume id
- readonlyVolumes map[storage.VolumeId]bool // transient set of readonly volumes
- oversizedVolumes map[storage.VolumeId]bool // set of oversized volumes
+ rp *super_block.ReplicaPlacement
+ ttl *needle.TTL
+ vid2location map[needle.VolumeId]*VolumeLocationList
+ writables []needle.VolumeId // transient array of writable volume id
+ readonlyVolumes map[needle.VolumeId]bool // transient set of readonly volumes
+ oversizedVolumes map[needle.VolumeId]bool // set of oversized volumes
volumeSizeLimit uint64
+ replicationAsMin bool
accessLock sync.RWMutex
}
@@ -29,15 +32,16 @@ type VolumeLayoutStats struct {
FileCount uint64
}
-func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout {
+func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64, replicationAsMin bool) *VolumeLayout {
return &VolumeLayout{
rp: rp,
ttl: ttl,
- vid2location: make(map[storage.VolumeId]*VolumeLocationList),
- writables: *new([]storage.VolumeId),
- readonlyVolumes: make(map[storage.VolumeId]bool),
- oversizedVolumes: make(map[storage.VolumeId]bool),
+ vid2location: make(map[needle.VolumeId]*VolumeLocationList),
+ writables: *new([]needle.VolumeId),
+ readonlyVolumes: make(map[needle.VolumeId]bool),
+ oversizedVolumes: make(map[needle.VolumeId]bool),
volumeSizeLimit: volumeSizeLimit,
+ replicationAsMin: replicationAsMin,
}
}
@@ -49,6 +53,9 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
+ defer vl.ensureCorrectWritables(v)
+ defer vl.rememberOversizedVolume(v)
+
if _, ok := vl.vid2location[v.Id]; !ok {
vl.vid2location[v.Id] = NewVolumeLocationList()
}
@@ -57,7 +64,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
for _, dn := range vl.vid2location[v.Id].list {
if vInfo, err := dn.GetVolumesById(v.Id); err == nil {
if vInfo.ReadOnly {
- glog.V(3).Infof("vid %d removed from writable", v.Id)
+ glog.V(1).Infof("vid %d removed from writable", v.Id)
vl.removeFromWritable(v.Id)
vl.readonlyVolumes[v.Id] = true
return
@@ -65,23 +72,16 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
delete(vl.readonlyVolumes, v.Id)
}
} else {
- glog.V(3).Infof("vid %d removed from writable", v.Id)
+ glog.V(1).Infof("vid %d removed from writable", v.Id)
vl.removeFromWritable(v.Id)
delete(vl.readonlyVolumes, v.Id)
return
}
}
- if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
- if _, ok := vl.oversizedVolumes[v.Id]; !ok {
- vl.addToWritable(v.Id)
- }
- } else {
- vl.rememberOversizedVolumne(v)
- vl.removeFromWritable(v.Id)
- }
+
}
-func (vl *VolumeLayout) rememberOversizedVolumne(v *storage.VolumeInfo) {
+func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo) {
if vl.isOversized(v) {
vl.oversizedVolumes[v.Id] = true
}
@@ -91,17 +91,31 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
- vl.removeFromWritable(v.Id)
- delete(vl.vid2location, v.Id)
+ // remove from vid2location map
+ location, ok := vl.vid2location[v.Id]
+ if !ok {
+ return
+ }
+
+ if location.Remove(dn) {
+
+ vl.ensureCorrectWritables(v)
+
+ if location.Length() == 0 {
+ delete(vl.vid2location, v.Id)
+ }
+
+ }
}
-func (vl *VolumeLayout) addToWritable(vid storage.VolumeId) {
- for _, id := range vl.writables {
- if vid == id {
- return
+func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) {
+ if vl.enoughCopies(v.Id) && vl.isWritable(v) {
+ if _, ok := vl.oversizedVolumes[v.Id]; !ok {
+ vl.setVolumeWritable(v.Id)
}
+ } else {
+ vl.removeFromWritable(v.Id)
}
- vl.writables = append(vl.writables, vid)
}
func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool {
@@ -110,7 +124,7 @@ func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool {
func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
return !vl.isOversized(v) &&
- v.Version == storage.CurrentVersion &&
+ v.Version == needle.CurrentVersion &&
!v.ReadOnly
}
@@ -121,7 +135,7 @@ func (vl *VolumeLayout) isEmpty() bool {
return len(vl.vid2location) == 0
}
-func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
+func (vl *VolumeLayout) Lookup(vid needle.VolumeId) []*DataNode {
vl.accessLock.RLock()
defer vl.accessLock.RUnlock()
@@ -141,7 +155,7 @@ func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) {
return
}
-func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*storage.VolumeId, uint64, *VolumeLocationList, error) {
+func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*needle.VolumeId, uint64, *VolumeLocationList, error) {
vl.accessLock.RLock()
defer vl.accessLock.RUnlock()
@@ -158,7 +172,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*s
}
return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!")
}
- var vid storage.VolumeId
+ var vid needle.VolumeId
var locationList *VolumeLocationList
counter := 0
for _, v := range vl.writables {
@@ -205,7 +219,7 @@ func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int {
return counter
}
-func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool {
+func (vl *VolumeLayout) removeFromWritable(vid needle.VolumeId) bool {
toDeleteIndex := -1
for k, id := range vl.writables {
if id == vid {
@@ -220,7 +234,7 @@ func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool {
}
return false
}
-func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
+func (vl *VolumeLayout) setVolumeWritable(vid needle.VolumeId) bool {
for _, v := range vl.writables {
if v == vid {
return false
@@ -231,7 +245,7 @@ func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
return true
}
-func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool {
+func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid needle.VolumeId) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
@@ -245,18 +259,34 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId)
}
return false
}
-func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool {
+func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId, isReadOnly bool) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
+ vInfo, err := dn.GetVolumesById(vid)
+ if err != nil {
+ return false
+ }
+
vl.vid2location[vid].Set(dn)
- if vl.vid2location[vid].Length() >= vl.rp.GetCopyCount() {
+
+ if vInfo.ReadOnly || isReadOnly {
+ return false
+ }
+
+ if vl.enoughCopies(vid) {
return vl.setVolumeWritable(vid)
}
return false
}
-func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool {
+func (vl *VolumeLayout) enoughCopies(vid needle.VolumeId) bool {
+ locations := vl.vid2location[vid].Length()
+ desired := vl.rp.GetCopyCount()
+ return locations == desired || (vl.replicationAsMin && locations > desired)
+}
+
+func (vl *VolumeLayout) SetVolumeCapacityFull(vid needle.VolumeId) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()