diff options
| author | yourchanges <yourchanges@gmail.com> | 2014-10-21 15:50:48 +0800 |
|---|---|---|
| committer | yourchanges <yourchanges@gmail.com> | 2014-10-21 15:50:48 +0800 |
| commit | f7bcd8e958ef185baeca0c455a397d49fcb62256 (patch) | |
| tree | bb2a2de4fcb7f9ac7e1d65a78e82cbe1f5f17e36 /go/topology/topology.go | |
| parent | 78ccbbf3d0766e1ececf91fa00766d1ed33050cc (diff) | |
| parent | a3c17f17b144c4298409fddda2d6bc4b39d38ca5 (diff) | |
| download | seaweedfs-f7bcd8e958ef185baeca0c455a397d49fcb62256.tar.xz seaweedfs-f7bcd8e958ef185baeca0c455a397d49fcb62256.zip | |
Merge pull request #1 from chrislusf/master
update
Diffstat (limited to 'go/topology/topology.go')
| -rw-r--r-- | go/topology/topology.go | 29 |
1 files changed, 18 insertions, 11 deletions
diff --git a/go/topology/topology.go b/go/topology/topology.go index f1daffb53..c90e8de0b 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -1,10 +1,10 @@ package topology import ( - "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/operation" - "code.google.com/p/weed-fs/go/sequence" - "code.google.com/p/weed-fs/go/storage" + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/operation" + "github.com/chrislusf/weed-fs/go/sequence" + "github.com/chrislusf/weed-fs/go/storage" "errors" "github.com/goraft/raft" "io/ioutil" @@ -110,12 +110,12 @@ func (t *Topology) NextVolumeId() storage.VolumeId { } func (t *Topology) HasWriableVolume(option *VolumeGrowOption) bool { - vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement) + vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) return vl.GetActiveVolumeCount(option) > 0 } func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, int, *DataNode, error) { - vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement).PickForWrite(count, option) + vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option) if err != nil || datanodes.Length() == 0 { return "", 0, nil, errors.New("No writable volumes avalable!") } @@ -123,12 +123,12 @@ func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, in return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil } -func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement) *VolumeLayout { +func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { _, ok := t.collectionMap[collectionName] if !ok { t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit) } - return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp) + return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp, ttl) } func (t *Topology) GetCollection(collectionName string) (collection *Collection, ok bool) { @@ -141,10 +141,14 @@ func (t *Topology) DeleteCollection(collectionName string) { } func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { - t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(&v, dn) + t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).RegisterVolume(&v, dn) +} +func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { + glog.Infof("removing volume info:%+v", v) + t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).UnRegisterVolume(&v, dn) } -func (t *Topology) RegisterVolumes(joinMessage *operation.JoinMessage) { +func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) { t.Sequence.SetMax(*joinMessage.MaxFileKey) dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack) dc := t.GetOrCreateDataCenter(dcName) @@ -162,10 +166,13 @@ func (t *Topology) RegisterVolumes(joinMessage *operation.JoinMessage) { glog.V(0).Infoln("Fail to convert joined volume information:", err.Error()) } } - dn.UpdateVolumes(volumeInfos) + deletedVolumes := dn.UpdateVolumes(volumeInfos) for _, v := range volumeInfos { t.RegisterVolumeLayout(v, dn) } + for _, v := range deletedVolumes { + t.UnRegisterVolumeLayout(v, dn) + } } func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { |
