diff options
Diffstat (limited to 'weed/topology/volume_layout.go')
| -rw-r--r-- | weed/topology/volume_layout.go | 226 |
1 files changed, 226 insertions, 0 deletions
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go new file mode 100644 index 000000000..e500de583 --- /dev/null +++ b/weed/topology/volume_layout.go @@ -0,0 +1,226 @@ +package topology + +import ( + "errors" + "fmt" + "math/rand" + "sync" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" +) + +// mapping from volume to its locations, inverted from server to volume +type VolumeLayout struct { + rp *storage.ReplicaPlacement + ttl *storage.TTL + vid2location map[storage.VolumeId]*VolumeLocationList + writables []storage.VolumeId // transient array of writable volume id + volumeSizeLimit uint64 + accessLock sync.RWMutex +} + +func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout { + return &VolumeLayout{ + rp: rp, + ttl: ttl, + vid2location: make(map[storage.VolumeId]*VolumeLocationList), + writables: *new([]storage.VolumeId), + volumeSizeLimit: volumeSizeLimit, + } +} + +func (vl *VolumeLayout) String() string { + return fmt.Sprintf("rp:%v, ttl:%v, vid2location:%v, writables:%v, volumeSizeLimit:%v", vl.rp, vl.ttl, vl.vid2location, vl.writables, vl.volumeSizeLimit) +} + +func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { + vl.accessLock.Lock() + defer vl.accessLock.Unlock() + + if _, ok := vl.vid2location[v.Id]; !ok { + vl.vid2location[v.Id] = NewVolumeLocationList() + } + vl.vid2location[v.Id].Set(dn) + glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length(), "copy", v.ReplicaPlacement.GetCopyCount()) + if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { + vl.addToWritable(v.Id) + } else { + vl.removeFromWritable(v.Id) + } +} + +func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { + vl.accessLock.Lock() + defer vl.accessLock.Unlock() + + vl.removeFromWritable(v.Id) + delete(vl.vid2location, v.Id) +} + +func (vl *VolumeLayout) addToWritable(vid storage.VolumeId) { + for _, id := range vl.writables { + if vid == id { + return + } + } + vl.writables = append(vl.writables, vid) +} + +func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool { + return uint64(v.Size) < vl.volumeSizeLimit && + v.Version == storage.CurrentVersion && + !v.ReadOnly +} + +func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode { + vl.accessLock.RLock() + defer vl.accessLock.RUnlock() + + if location := vl.vid2location[vid]; location != nil { + return location.list + } + return nil +} + +func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) { + vl.accessLock.RLock() + defer vl.accessLock.RUnlock() + + for _, location := range vl.vid2location { + nodes = append(nodes, location.list...) + } + return +} + +func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*storage.VolumeId, uint64, *VolumeLocationList, error) { + vl.accessLock.RLock() + defer vl.accessLock.RUnlock() + + len_writers := len(vl.writables) + if len_writers <= 0 { + glog.V(0).Infoln("No more writable volumes!") + return nil, 0, nil, errors.New("No more writable volumes!") + } + if option.DataCenter == "" { + vid := vl.writables[rand.Intn(len_writers)] + locationList := vl.vid2location[vid] + if locationList != nil { + return &vid, count, locationList, nil + } + return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!") + } + var vid storage.VolumeId + var locationList *VolumeLocationList + counter := 0 + for _, v := range vl.writables { + volumeLocationList := vl.vid2location[v] + for _, dn := range volumeLocationList.list { + if dn.GetDataCenter().Id() == NodeId(option.DataCenter) { + if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) { + continue + } + if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) { + continue + } + counter++ + if rand.Intn(counter) < 1 { + vid, locationList = v, volumeLocationList + } + } + } + } + return &vid, count, locationList, nil +} + +func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int { + vl.accessLock.RLock() + defer vl.accessLock.RUnlock() + + if option.DataCenter == "" { + return len(vl.writables) + } + counter := 0 + for _, v := range vl.writables { + for _, dn := range vl.vid2location[v].list { + if dn.GetDataCenter().Id() == NodeId(option.DataCenter) { + if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) { + continue + } + if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) { + continue + } + counter++ + } + } + } + return counter +} + +func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool { + toDeleteIndex := -1 + for k, id := range vl.writables { + if id == vid { + toDeleteIndex = k + break + } + } + if toDeleteIndex >= 0 { + glog.V(0).Infoln("Volume", vid, "becomes unwritable") + vl.writables = append(vl.writables[0:toDeleteIndex], vl.writables[toDeleteIndex+1:]...) + return true + } + return false +} +func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool { + for _, v := range vl.writables { + if v == vid { + return false + } + } + glog.V(0).Infoln("Volume", vid, "becomes writable") + vl.writables = append(vl.writables, vid) + return true +} + +func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool { + vl.accessLock.Lock() + defer vl.accessLock.Unlock() + + if location, ok := vl.vid2location[vid]; ok { + if location.Remove(dn) { + if location.Length() < vl.rp.GetCopyCount() { + glog.V(0).Infoln("Volume", vid, "has", location.Length(), "replica, less than required", vl.rp.GetCopyCount()) + return vl.removeFromWritable(vid) + } + } + } + return false +} +func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool { + vl.accessLock.Lock() + defer vl.accessLock.Unlock() + + vl.vid2location[vid].Set(dn) + if vl.vid2location[vid].Length() >= vl.rp.GetCopyCount() { + return vl.setVolumeWritable(vid) + } + return false +} + +func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool { + vl.accessLock.Lock() + defer vl.accessLock.Unlock() + + // glog.V(0).Infoln("Volume", vid, "reaches full capacity.") + return vl.removeFromWritable(vid) +} + +func (vl *VolumeLayout) ToMap() map[string]interface{} { + m := make(map[string]interface{}) + m["replication"] = vl.rp.String() + m["ttl"] = vl.ttl.String() + m["writables"] = vl.writables + //m["locations"] = vl.vid2location + return m +} |
