diff options
Diffstat (limited to 'go/topology')
| -rw-r--r-- | go/topology/collection.go | 11 | ||||
| -rw-r--r-- | go/topology/data_node.go | 24 | ||||
| -rw-r--r-- | go/topology/topology.go | 11 | ||||
| -rw-r--r-- | go/topology/topology_event_handling.go | 4 | ||||
| -rw-r--r-- | go/topology/volume_layout.go | 18 |
5 files changed, 60 insertions, 8 deletions
diff --git a/go/topology/collection.go b/go/topology/collection.go index 8042369a9..b21122d22 100644 --- a/go/topology/collection.go +++ b/go/topology/collection.go @@ -36,3 +36,14 @@ func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { } return nil } + +func (c *Collection) ListVolumeServers() (nodes []*DataNode) { + for _, vl := range c.replicaType2VolumeLayout { + if vl != nil { + if list := vl.ListVolumeServers(); list != nil { + nodes = append(nodes, list...) + } + } + } + return +} diff --git a/go/topology/data_node.go b/go/topology/data_node.go index 3a6edb447..a83647939 100644 --- a/go/topology/data_node.go +++ b/go/topology/data_node.go @@ -1,8 +1,8 @@ package topology import ( + "code.google.com/p/weed-fs/go/glog" "code.google.com/p/weed-fs/go/storage" - _ "fmt" "strconv" ) @@ -28,12 +28,32 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { if _, ok := dn.volumes[v.Id]; !ok { dn.volumes[v.Id] = v dn.UpAdjustVolumeCountDelta(1) - dn.UpAdjustActiveVolumeCountDelta(1) + if !v.ReadOnly { + dn.UpAdjustActiveVolumeCountDelta(1) + } dn.UpAdjustMaxVolumeId(v.Id) } else { dn.volumes[v.Id] = v } } +func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) { + actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo) + for _, v := range actualVolumes { + actualVolumeMap[v.Id] = v + } + for vid, _ := range dn.volumes { + glog.V(2).Infoln("Checking volume id:", vid) + if _, ok := actualVolumeMap[vid]; !ok { + glog.V(0).Infoln("Deleting volume id:", vid) + delete(dn.volumes, vid) + dn.UpAdjustVolumeCountDelta(-1) + dn.UpAdjustActiveVolumeCountDelta(-1) + } + } //TODO: adjust max volume id, if need to reclaim volume ids + for _, v := range actualVolumes { + dn.AddOrUpdateVolume(v) + } +} func (dn *DataNode) GetDataCenter() *DataCenter { return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter) } diff --git a/go/topology/topology.go b/go/topology/topology.go index 1426f7a12..d72879035 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -99,6 +99,15 @@ func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPla return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp) } +func (t *Topology) GetCollection(collectionName string) (collection *Collection, ok bool) { + collection, ok = t.collectionMap[collectionName] + return +} + +func (t *Topology) DeleteCollection(collectionName string) { + delete(t.collectionMap, collectionName) +} + func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) { t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(v, dn) } @@ -112,8 +121,8 @@ func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, t.UnRegisterDataNode(dn) } dn = rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount) + dn.UpdateVolumes(volumeInfos) for _, v := range volumeInfos { - dn.AddOrUpdateVolume(v) t.RegisterVolumeLayout(&v, dn) } } diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go index 5740c9a03..710e7b2ae 100644 --- a/go/topology/topology_event_handling.go +++ b/go/topology/topology_event_handling.go @@ -46,7 +46,9 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { return false } for _, dn := range vl.vid2location[volumeInfo.Id].list { - dn.UpAdjustActiveVolumeCountDelta(-1) + if !volumeInfo.ReadOnly { + dn.UpAdjustActiveVolumeCountDelta(-1) + } } return true } diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go index 40628b4a0..1a35faa5c 100644 --- a/go/topology/volume_layout.go +++ b/go/topology/volume_layout.go @@ -8,6 +8,7 @@ import ( "sync" ) +// mapping from volume to its locations, inverted from server to volume type VolumeLayout struct { rp *storage.ReplicaPlacement vid2location map[storage.VolumeId]*VolumeLocationList @@ -56,6 +57,13 @@ func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode { return nil } +func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) { + for _, location := range vl.vid2location { + nodes = append(nodes, location.list...) + } + return +} + func (vl *VolumeLayout) PickForWrite(count int, dataCenter string) (*storage.VolumeId, int, *VolumeLocationList, error) { len_writers := len(vl.writables) if len_writers <= 0 { @@ -134,10 +142,12 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) vl.accessLock.Lock() defer vl.accessLock.Unlock() - if vl.vid2location[vid].Remove(dn) { - if vl.vid2location[vid].Length() < vl.rp.GetCopyCount() { - glog.V(0).Infoln("Volume", vid, "has", vl.vid2location[vid].Length(), "replica, less than required", vl.rp.GetCopyCount()) - return vl.removeFromWritable(vid) + if location, ok := vl.vid2location[vid]; ok { + if location.Remove(dn) { + if location.Length() < vl.rp.GetCopyCount() { + glog.V(0).Infoln("Volume", vid, "has", location.Length(), "replica, less than required", vl.rp.GetCopyCount()) + return vl.removeFromWritable(vid) + } } } return false |
