diff options
| author | Chris Lu <chris.lu@gmail.com> | 2014-09-20 12:38:59 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2014-09-20 12:38:59 -0700 |
| commit | b9aee2defbc2f5aafbc3ea049fbe2ab5f3320999 (patch) | |
| tree | 719442dc72cc30958e54e4f7e59076796b6775e9 /go/topology/topology.go | |
| parent | a092794804b2f7cbd656e439305d29bfa96ad2b9 (diff) | |
| download | seaweedfs-b9aee2defbc2f5aafbc3ea049fbe2ab5f3320999.tar.xz seaweedfs-b9aee2defbc2f5aafbc3ea049fbe2ab5f3320999.zip | |
add TTL support
The volume TTL and file TTL are not necessarily the same. as long as
file TTL is smaller than volume TTL, it'll be fine.
volume TTL is used when assigning file id, e.g.
http://.../dir/assign?ttl=3h
file TTL is used when uploading
Diffstat (limited to 'go/topology/topology.go')
| -rw-r--r-- | go/topology/topology.go | 21 |
1 files changed, 14 insertions, 7 deletions
diff --git a/go/topology/topology.go b/go/topology/topology.go index f1daffb53..acdef5e36 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -110,12 +110,12 @@ func (t *Topology) NextVolumeId() storage.VolumeId { } func (t *Topology) HasWriableVolume(option *VolumeGrowOption) bool { - vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement) + vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) return vl.GetActiveVolumeCount(option) > 0 } func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, int, *DataNode, error) { - vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement).PickForWrite(count, option) + vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option) if err != nil || datanodes.Length() == 0 { return "", 0, nil, errors.New("No writable volumes avalable!") } @@ -123,12 +123,12 @@ func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, in return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil } -func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement) *VolumeLayout { +func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { _, ok := t.collectionMap[collectionName] if !ok { t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit) } - return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp) + return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp, ttl) } func (t *Topology) GetCollection(collectionName string) (collection *Collection, ok bool) { @@ -141,10 +141,14 @@ func (t *Topology) DeleteCollection(collectionName string) { } func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { - t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(&v, dn) + t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).RegisterVolume(&v, dn) +} +func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { + glog.Infof("removing volume info:%+v", v) + t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).UnRegisterVolume(&v, dn) } -func (t *Topology) RegisterVolumes(joinMessage *operation.JoinMessage) { +func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) { t.Sequence.SetMax(*joinMessage.MaxFileKey) dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack) dc := t.GetOrCreateDataCenter(dcName) @@ -162,10 +166,13 @@ func (t *Topology) RegisterVolumes(joinMessage *operation.JoinMessage) { glog.V(0).Infoln("Fail to convert joined volume information:", err.Error()) } } - dn.UpdateVolumes(volumeInfos) + deletedVolumes := dn.UpdateVolumes(volumeInfos) for _, v := range volumeInfos { t.RegisterVolumeLayout(v, dn) } + for _, v := range deletedVolumes { + t.UnRegisterVolumeLayout(v, dn) + } } func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { |
