diff options
Diffstat (limited to 'weed/topology/volume_layout.go')
| -rw-r--r-- | weed/topology/volume_layout.go | 108 |
1 files changed, 69 insertions, 39 deletions
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 71a071e2f..9e84fd2da 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -9,17 +9,20 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" ) // mapping from volume to its locations, inverted from server to volume type VolumeLayout struct { - rp *storage.ReplicaPlacement - ttl *storage.TTL - vid2location map[storage.VolumeId]*VolumeLocationList - writables []storage.VolumeId // transient array of writable volume id - readonlyVolumes map[storage.VolumeId]bool // transient set of readonly volumes - oversizedVolumes map[storage.VolumeId]bool // set of oversized volumes + rp *super_block.ReplicaPlacement + ttl *needle.TTL + vid2location map[needle.VolumeId]*VolumeLocationList + writables []needle.VolumeId // transient array of writable volume id + readonlyVolumes map[needle.VolumeId]bool // transient set of readonly volumes + oversizedVolumes map[needle.VolumeId]bool // set of oversized volumes volumeSizeLimit uint64 + replicationAsMin bool accessLock sync.RWMutex } @@ -29,15 +32,16 @@ type VolumeLayoutStats struct { FileCount uint64 } -func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout { +func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64, replicationAsMin bool) *VolumeLayout { return &VolumeLayout{ rp: rp, ttl: ttl, - vid2location: make(map[storage.VolumeId]*VolumeLocationList), - writables: *new([]storage.VolumeId), - readonlyVolumes: make(map[storage.VolumeId]bool), - oversizedVolumes: make(map[storage.VolumeId]bool), + vid2location: make(map[needle.VolumeId]*VolumeLocationList), + writables: *new([]needle.VolumeId), + readonlyVolumes: make(map[needle.VolumeId]bool), + oversizedVolumes: make(map[needle.VolumeId]bool), volumeSizeLimit: volumeSizeLimit, + replicationAsMin: replicationAsMin, } } @@ -49,6 +53,9 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.accessLock.Lock() defer vl.accessLock.Unlock() + defer vl.ensureCorrectWritables(v) + defer vl.rememberOversizedVolume(v) + if _, ok := vl.vid2location[v.Id]; !ok { vl.vid2location[v.Id] = NewVolumeLocationList() } @@ -57,7 +64,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { for _, dn := range vl.vid2location[v.Id].list { if vInfo, err := dn.GetVolumesById(v.Id); err == nil { if vInfo.ReadOnly { - glog.V(3).Infof("vid %d removed from writable", v.Id) + glog.V(1).Infof("vid %d removed from writable", v.Id) vl.removeFromWritable(v.Id) vl.readonlyVolumes[v.Id] = true return @@ -65,23 +72,16 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { delete(vl.readonlyVolumes, v.Id) } } else { - glog.V(3).Infof("vid %d removed from writable", v.Id) + glog.V(1).Infof("vid %d removed from writable", v.Id) vl.removeFromWritable(v.Id) delete(vl.readonlyVolumes, v.Id) return } } - if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { - if _, ok := vl.oversizedVolumes[v.Id]; !ok { - vl.addToWritable(v.Id) - } - } else { - vl.rememberOversizedVolumne(v) - vl.removeFromWritable(v.Id) - } + } -func (vl *VolumeLayout) rememberOversizedVolumne(v *storage.VolumeInfo) { +func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo) { if vl.isOversized(v) { vl.oversizedVolumes[v.Id] = true } @@ -91,17 +91,31 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.accessLock.Lock() defer vl.accessLock.Unlock() - vl.removeFromWritable(v.Id) - delete(vl.vid2location, v.Id) + // remove from vid2location map + location, ok := vl.vid2location[v.Id] + if !ok { + return + } + + if location.Remove(dn) { + + vl.ensureCorrectWritables(v) + + if location.Length() == 0 { + delete(vl.vid2location, v.Id) + } + + } } -func (vl *VolumeLayout) addToWritable(vid storage.VolumeId) { - for _, id := range vl.writables { - if vid == id { - return +func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) { + if vl.enoughCopies(v.Id) && vl.isWritable(v) { + if _, ok := vl.oversizedVolumes[v.Id]; !ok { + vl.setVolumeWritable(v.Id) } + } else { + vl.removeFromWritable(v.Id) } - vl.writables = append(vl.writables, vid) } func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool { @@ -110,7 +124,7 @@ func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool { func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool { return !vl.isOversized(v) && - v.Version == storage.CurrentVersion && + v.Version == needle.CurrentVersion && !v.ReadOnly } @@ -121,7 +135,7 @@ func (vl *VolumeLayout) isEmpty() bool { return len(vl.vid2location) == 0 } -func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode { +func (vl *VolumeLayout) Lookup(vid needle.VolumeId) []*DataNode { vl.accessLock.RLock() defer vl.accessLock.RUnlock() @@ -141,7 +155,7 @@ func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) { return } -func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*storage.VolumeId, uint64, *VolumeLocationList, error) { +func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*needle.VolumeId, uint64, *VolumeLocationList, error) { vl.accessLock.RLock() defer vl.accessLock.RUnlock() @@ -158,7 +172,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*s } return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!") } - var vid storage.VolumeId + var vid needle.VolumeId var locationList *VolumeLocationList counter := 0 for _, v := range vl.writables { @@ -205,7 +219,7 @@ func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int { return counter } -func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool { +func (vl *VolumeLayout) removeFromWritable(vid needle.VolumeId) bool { toDeleteIndex := -1 for k, id := range vl.writables { if id == vid { @@ -220,7 +234,7 @@ func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool { } return false } -func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool { +func (vl *VolumeLayout) setVolumeWritable(vid needle.VolumeId) bool { for _, v := range vl.writables { if v == vid { return false @@ -231,7 +245,7 @@ func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool { return true } -func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool { +func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid needle.VolumeId) bool { vl.accessLock.Lock() defer vl.accessLock.Unlock() @@ -245,18 +259,34 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) } return false } -func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool { +func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId, isReadOnly bool) bool { vl.accessLock.Lock() defer vl.accessLock.Unlock() + vInfo, err := dn.GetVolumesById(vid) + if err != nil { + return false + } + vl.vid2location[vid].Set(dn) - if vl.vid2location[vid].Length() >= vl.rp.GetCopyCount() { + + if vInfo.ReadOnly || isReadOnly { + return false + } + + if vl.enoughCopies(vid) { return vl.setVolumeWritable(vid) } return false } -func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool { +func (vl *VolumeLayout) enoughCopies(vid needle.VolumeId) bool { + locations := vl.vid2location[vid].Length() + desired := vl.rp.GetCopyCount() + return locations == desired || (vl.replicationAsMin && locations > desired) +} + +func (vl *VolumeLayout) SetVolumeCapacityFull(vid needle.VolumeId) bool { vl.accessLock.Lock() defer vl.accessLock.Unlock() |
