aboutsummaryrefslogtreecommitdiff
path: root/go/topology/topology.go
diff options
context:
space:
mode:
Diffstat (limited to 'go/topology/topology.go')
-rw-r--r--go/topology/topology.go21
1 files changed, 14 insertions, 7 deletions
diff --git a/go/topology/topology.go b/go/topology/topology.go
index f1daffb53..acdef5e36 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -110,12 +110,12 @@ func (t *Topology) NextVolumeId() storage.VolumeId {
}
func (t *Topology) HasWriableVolume(option *VolumeGrowOption) bool {
- vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement)
+ vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
return vl.GetActiveVolumeCount(option) > 0
}
func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, int, *DataNode, error) {
- vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement).PickForWrite(count, option)
+ vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option)
if err != nil || datanodes.Length() == 0 {
return "", 0, nil, errors.New("No writable volumes avalable!")
}
@@ -123,12 +123,12 @@ func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, in
return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
}
-func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement) *VolumeLayout {
+func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
_, ok := t.collectionMap[collectionName]
if !ok {
t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit)
}
- return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp)
+ return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp, ttl)
}
func (t *Topology) GetCollection(collectionName string) (collection *Collection, ok bool) {
@@ -141,10 +141,14 @@ func (t *Topology) DeleteCollection(collectionName string) {
}
func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
- t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(&v, dn)
+ t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).RegisterVolume(&v, dn)
+}
+func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
+ glog.Infof("removing volume info:%+v", v)
+ t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).UnRegisterVolume(&v, dn)
}
-func (t *Topology) RegisterVolumes(joinMessage *operation.JoinMessage) {
+func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
t.Sequence.SetMax(*joinMessage.MaxFileKey)
dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack)
dc := t.GetOrCreateDataCenter(dcName)
@@ -162,10 +166,13 @@ func (t *Topology) RegisterVolumes(joinMessage *operation.JoinMessage) {
glog.V(0).Infoln("Fail to convert joined volume information:", err.Error())
}
}
- dn.UpdateVolumes(volumeInfos)
+ deletedVolumes := dn.UpdateVolumes(volumeInfos)
for _, v := range volumeInfos {
t.RegisterVolumeLayout(v, dn)
}
+ for _, v := range deletedVolumes {
+ t.UnRegisterVolumeLayout(v, dn)
+ }
}
func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {