aboutsummaryrefslogtreecommitdiff
path: root/go/topology/topology.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2014-09-20 12:38:59 -0700
committerChris Lu <chris.lu@gmail.com>2014-09-20 12:38:59 -0700
commitb9aee2defbc2f5aafbc3ea049fbe2ab5f3320999 (patch)
tree719442dc72cc30958e54e4f7e59076796b6775e9 /go/topology/topology.go
parenta092794804b2f7cbd656e439305d29bfa96ad2b9 (diff)
downloadseaweedfs-b9aee2defbc2f5aafbc3ea049fbe2ab5f3320999.tar.xz
seaweedfs-b9aee2defbc2f5aafbc3ea049fbe2ab5f3320999.zip
add TTL support
The volume TTL and file TTL are not necessarily the same. as long as file TTL is smaller than volume TTL, it'll be fine. volume TTL is used when assigning file id, e.g. http://.../dir/assign?ttl=3h file TTL is used when uploading
Diffstat (limited to 'go/topology/topology.go')
-rw-r--r--go/topology/topology.go21
1 files changed, 14 insertions, 7 deletions
diff --git a/go/topology/topology.go b/go/topology/topology.go
index f1daffb53..acdef5e36 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -110,12 +110,12 @@ func (t *Topology) NextVolumeId() storage.VolumeId {
}
func (t *Topology) HasWriableVolume(option *VolumeGrowOption) bool {
- vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement)
+ vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
return vl.GetActiveVolumeCount(option) > 0
}
func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, int, *DataNode, error) {
- vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement).PickForWrite(count, option)
+ vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option)
if err != nil || datanodes.Length() == 0 {
return "", 0, nil, errors.New("No writable volumes avalable!")
}
@@ -123,12 +123,12 @@ func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, in
return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
}
-func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement) *VolumeLayout {
+func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
_, ok := t.collectionMap[collectionName]
if !ok {
t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit)
}
- return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp)
+ return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp, ttl)
}
func (t *Topology) GetCollection(collectionName string) (collection *Collection, ok bool) {
@@ -141,10 +141,14 @@ func (t *Topology) DeleteCollection(collectionName string) {
}
func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
- t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(&v, dn)
+ t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).RegisterVolume(&v, dn)
+}
+func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
+ glog.Infof("removing volume info:%+v", v)
+ t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).UnRegisterVolume(&v, dn)
}
-func (t *Topology) RegisterVolumes(joinMessage *operation.JoinMessage) {
+func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
t.Sequence.SetMax(*joinMessage.MaxFileKey)
dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack)
dc := t.GetOrCreateDataCenter(dcName)
@@ -162,10 +166,13 @@ func (t *Topology) RegisterVolumes(joinMessage *operation.JoinMessage) {
glog.V(0).Infoln("Fail to convert joined volume information:", err.Error())
}
}
- dn.UpdateVolumes(volumeInfos)
+ deletedVolumes := dn.UpdateVolumes(volumeInfos)
for _, v := range volumeInfos {
t.RegisterVolumeLayout(v, dn)
}
+ for _, v := range deletedVolumes {
+ t.UnRegisterVolumeLayout(v, dn)
+ }
}
func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {