diff options
| author | bingoohuang <bingoo.huang@gmail.com> | 2021-02-18 13:57:34 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-02-18 13:57:34 +0800 |
| commit | c8f56f5712c1efffc46de95a8057ed09c21da2db (patch) | |
| tree | bc3330e274901d782395b7396cb54d7cc42608b1 /weed/topology/topology.go | |
| parent | 12a78335860c4b1e220748e4adc4097050af5272 (diff) | |
| parent | 3575d41009e4367658e75e6ae780c6260b80daf9 (diff) | |
| download | seaweedfs-c8f56f5712c1efffc46de95a8057ed09c21da2db.tar.xz seaweedfs-c8f56f5712c1efffc46de95a8057ed09c21da2db.zip | |
Merge pull request #2 from chrislusf/master
Diffstat (limited to 'weed/topology/topology.go')
| -rw-r--r-- | weed/topology/topology.go | 34 |
1 files changed, 25 insertions, 9 deletions
diff --git a/weed/topology/topology.go b/weed/topology/topology.go index bde72cf09..08ebd24fd 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -3,6 +3,7 @@ package topology import ( "errors" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "math/rand" "sync" "time" @@ -45,6 +46,7 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls t.id = NodeId(id) t.nodeType = "Topology" t.NodeImpl.value = t + t.diskUsages = newDiskUsages() t.children = make(map[NodeId]Node) t.collectionMap = util.NewConcurrentReadMap() t.ecShardMap = make(map[needle.VolumeId]*EcShardLocations) @@ -121,12 +123,12 @@ func (t *Topology) NextVolumeId() (needle.VolumeId, error) { } func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool { - vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) + vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType) return vl.GetActiveVolumeCount(option) > 0 } func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) { - vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option) + vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType).PickForWrite(count, option) if err != nil { return "", 0, nil, fmt.Errorf("failed to find writable volumes for collection:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err) } @@ -137,10 +139,10 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil } -func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout { +func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) *VolumeLayout { return t.collectionMap.Get(collectionName, func() interface{} { return NewCollection(collectionName, t.volumeSizeLimit, t.replicationAsMin) - }).(*Collection).GetOrCreateVolumeLayout(rp, ttl) + }).(*Collection).GetOrCreateVolumeLayout(rp, ttl, diskType) } func (t *Topology) ListCollections(includeNormalVolumes, includeEcVolumes bool) (ret []string) { @@ -176,17 +178,30 @@ func (t *Topology) DeleteCollection(collectionName string) { t.collectionMap.Delete(collectionName) } +func (t *Topology) DeleteLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) { + collection, found := t.FindCollection(collectionName) + if !found { + return + } + collection.DeleteVolumeLayout(rp, ttl, diskType) + if len(collection.storageType2VolumeLayout.Items()) == 0 { + t.DeleteCollection(collectionName) + } +} + func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { - vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) + diskType := types.ToDiskType(v.DiskType) + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) vl.RegisterVolume(&v, dn) vl.EnsureCorrectWritables(&v) } func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { - glog.Infof("removing volume info:%+v", v) - volumeLayout := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) + glog.Infof("removing volume info: %+v", v) + diskType := types.ToDiskType(v.DiskType) + volumeLayout := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) volumeLayout.UnRegisterVolume(&v, dn) if volumeLayout.isEmpty() { - t.DeleteCollection(v.Collection) + t.DeleteLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) } } @@ -222,7 +237,8 @@ func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformati t.UnRegisterVolumeLayout(v, dn) } for _, v := range changedVolumes { - vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) + diskType := types.ToDiskType(v.DiskType) + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) vl.EnsureCorrectWritables(&v) } return |
