diff options
Diffstat (limited to 'weed/topology')
| -rw-r--r-- | weed/topology/allocate_volume.go | 1 | ||||
| -rw-r--r-- | weed/topology/collection.go | 19 | ||||
| -rw-r--r-- | weed/topology/data_center.go | 15 | ||||
| -rw-r--r-- | weed/topology/data_node.go | 177 | ||||
| -rw-r--r-- | weed/topology/data_node_ec.go | 124 | ||||
| -rw-r--r-- | weed/topology/disk.go | 289 | ||||
| -rw-r--r-- | weed/topology/disk_ec.go | 84 | ||||
| -rw-r--r-- | weed/topology/node.go | 150 | ||||
| -rw-r--r-- | weed/topology/rack.go | 20 | ||||
| -rw-r--r-- | weed/topology/store_replicate.go | 12 | ||||
| -rw-r--r-- | weed/topology/topology.go | 34 | ||||
| -rw-r--r-- | weed/topology/topology_ec.go | 3 | ||||
| -rw-r--r-- | weed/topology/topology_event_handling.go | 23 | ||||
| -rw-r--r-- | weed/topology/topology_map.go | 16 | ||||
| -rw-r--r-- | weed/topology/topology_test.go | 64 | ||||
| -rw-r--r-- | weed/topology/volume_growth.go | 33 | ||||
| -rw-r--r-- | weed/topology/volume_growth_test.go | 8 | ||||
| -rw-r--r-- | weed/topology/volume_layout.go | 5 |
18 files changed, 753 insertions, 324 deletions
diff --git a/weed/topology/allocate_volume.go b/weed/topology/allocate_volume.go index e5dc48652..39c24ab04 100644 --- a/weed/topology/allocate_volume.go +++ b/weed/topology/allocate_volume.go @@ -24,6 +24,7 @@ func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.Vol Ttl: option.Ttl.String(), Preallocate: option.Prealloacte, MemoryMapMaxSizeMb: option.MemoryMapMaxSizeMb, + DiskType: string(option.DiskType), }) return deleteErr }) diff --git a/weed/topology/collection.go b/weed/topology/collection.go index 5b410d1eb..a14b68851 100644 --- a/weed/topology/collection.go +++ b/weed/topology/collection.go @@ -2,6 +2,7 @@ package topology import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/super_block" @@ -29,17 +30,31 @@ func (c *Collection) String() string { return fmt.Sprintf("Name:%s, volumeSizeLimit:%d, storageType2VolumeLayout:%v", c.Name, c.volumeSizeLimit, c.storageType2VolumeLayout) } -func (c *Collection) GetOrCreateVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout { +func (c *Collection) GetOrCreateVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) *VolumeLayout { keyString := rp.String() if ttl != nil { keyString += ttl.String() } + if diskType != types.HardDriveType { + keyString += string(diskType) + } vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} { - return NewVolumeLayout(rp, ttl, c.volumeSizeLimit, c.replicationAsMin) + return NewVolumeLayout(rp, ttl, diskType, c.volumeSizeLimit, c.replicationAsMin) }) return vl.(*VolumeLayout) } +func (c *Collection) DeleteVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) { + keyString := rp.String() + if ttl != nil { + keyString += ttl.String() + } + if diskType != types.HardDriveType { + keyString += string(diskType) + } + c.storageType2VolumeLayout.Delete(keyString) +} + func (c *Collection) Lookup(vid needle.VolumeId) []*DataNode { for _, vl := range c.storageType2VolumeLayout.Items() { if vl != nil { diff --git a/weed/topology/data_center.go b/weed/topology/data_center.go index dc3accb71..60d91ba6d 100644 --- a/weed/topology/data_center.go +++ b/weed/topology/data_center.go @@ -1,6 +1,8 @@ package topology -import "github.com/chrislusf/seaweedfs/weed/pb/master_pb" +import ( + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" +) type DataCenter struct { NodeImpl @@ -10,6 +12,7 @@ func NewDataCenter(id string) *DataCenter { dc := &DataCenter{} dc.id = NodeId(id) dc.nodeType = "DataCenter" + dc.diskUsages = newDiskUsages() dc.children = make(map[NodeId]Node) dc.NodeImpl.value = dc return dc @@ -30,8 +33,6 @@ func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack { func (dc *DataCenter) ToMap() interface{} { m := make(map[string]interface{}) m["Id"] = dc.Id() - m["Max"] = dc.GetMaxVolumeCount() - m["Free"] = dc.FreeSpace() var racks []interface{} for _, c := range dc.Children() { rack := c.(*Rack) @@ -43,12 +44,8 @@ func (dc *DataCenter) ToMap() interface{} { func (dc *DataCenter) ToDataCenterInfo() *master_pb.DataCenterInfo { m := &master_pb.DataCenterInfo{ - Id: string(dc.Id()), - VolumeCount: uint64(dc.GetVolumeCount()), - MaxVolumeCount: uint64(dc.GetMaxVolumeCount()), - FreeVolumeCount: uint64(dc.FreeSpace()), - ActiveVolumeCount: uint64(dc.GetActiveVolumeCount()), - RemoteVolumeCount: uint64(dc.GetRemoteVolumeCount()), + Id: string(dc.Id()), + DiskInfos: dc.diskUsages.ToDiskInfo(), } for _, c := range dc.Children() { rack := c.(*Rack) diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go index eaed51654..1a0ebf761 100644 --- a/weed/topology/data_node.go +++ b/weed/topology/data_node.go @@ -2,13 +2,11 @@ package topology import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/util" - "strconv" - "sync" - "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" + "strconv" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" @@ -16,29 +14,26 @@ import ( type DataNode struct { NodeImpl - volumes map[needle.VolumeId]storage.VolumeInfo - Ip string - Port int - PublicUrl string - LastSeen int64 // unix time in seconds - ecShards map[needle.VolumeId]*erasure_coding.EcVolumeInfo - ecShardsLock sync.RWMutex + Ip string + Port int + PublicUrl string + LastSeen int64 // unix time in seconds } func NewDataNode(id string) *DataNode { - s := &DataNode{} - s.id = NodeId(id) - s.nodeType = "DataNode" - s.volumes = make(map[needle.VolumeId]storage.VolumeInfo) - s.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo) - s.NodeImpl.value = s - return s + dn := &DataNode{} + dn.id = NodeId(id) + dn.nodeType = "DataNode" + dn.diskUsages = newDiskUsages() + dn.children = make(map[NodeId]Node) + dn.NodeImpl.value = dn + return dn } func (dn *DataNode) String() string { dn.RLock() defer dn.RUnlock() - return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl) + return fmt.Sprintf("Node:%s, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.Ip, dn.Port, dn.PublicUrl) } func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) { @@ -47,33 +42,23 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO return dn.doAddOrUpdateVolume(v) } -func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) { - if oldV, ok := dn.volumes[v.Id]; !ok { - dn.volumes[v.Id] = v - dn.UpAdjustVolumeCountDelta(1) - if v.IsRemote() { - dn.UpAdjustRemoteVolumeCountDelta(1) - } - if !v.ReadOnly { - dn.UpAdjustActiveVolumeCountDelta(1) - } - dn.UpAdjustMaxVolumeId(v.Id) - isNew = true - } else { - if oldV.IsRemote() != v.IsRemote() { - if v.IsRemote() { - dn.UpAdjustRemoteVolumeCountDelta(1) - } - if oldV.IsRemote() { - dn.UpAdjustRemoteVolumeCountDelta(-1) - } - } - isChangedRO = dn.volumes[v.Id].ReadOnly != v.ReadOnly - dn.volumes[v.Id] = v +func (dn *DataNode) getOrCreateDisk(diskType string) *Disk { + c, found := dn.children[NodeId(diskType)] + if !found { + c = NewDisk(diskType) + dn.doLinkChildNode(c) } - return + disk := c.(*Disk) + return disk } +func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) { + disk := dn.getOrCreateDisk(v.DiskType) + return disk.AddOrUpdateVolume(v) +} + +// UpdateVolumes detects new/deleted/changed volumes on a volume server +// used in master to notify master clients of these changes. func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changeRO []storage.VolumeInfo) { actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo) @@ -84,18 +69,26 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume dn.Lock() defer dn.Unlock() - for vid, v := range dn.volumes { + existingVolumes := dn.getVolumes() + + for _, v := range existingVolumes { + vid := v.Id if _, ok := actualVolumeMap[vid]; !ok { glog.V(0).Infoln("Deleting volume id:", vid) - delete(dn.volumes, vid) + disk := dn.getOrCreateDisk(v.DiskType) + delete(disk.volumes, vid) deletedVolumes = append(deletedVolumes, v) - dn.UpAdjustVolumeCountDelta(-1) + + deltaDiskUsages := newDiskUsages() + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType)) + deltaDiskUsage.volumeCount = -1 if v.IsRemote() { - dn.UpAdjustRemoteVolumeCountDelta(-1) + deltaDiskUsage.remoteVolumeCount = -1 } if !v.ReadOnly { - dn.UpAdjustActiveVolumeCountDelta(-1) + deltaDiskUsage.activeVolumeCount = -1 } + disk.UpAdjustDiskUsageDelta(deltaDiskUsages) } } for _, v := range actualVolumes { @@ -115,14 +108,19 @@ func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.Volu defer dn.Unlock() for _, v := range deletedVolumes { - delete(dn.volumes, v.Id) - dn.UpAdjustVolumeCountDelta(-1) + disk := dn.getOrCreateDisk(v.DiskType) + delete(disk.volumes, v.Id) + + deltaDiskUsages := newDiskUsages() + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType)) + deltaDiskUsage.volumeCount = -1 if v.IsRemote() { - dn.UpAdjustRemoteVolumeCountDelta(-1) + deltaDiskUsage.remoteVolumeCount = -1 } if !v.ReadOnly { - dn.UpAdjustActiveVolumeCountDelta(-1) + deltaDiskUsage.activeVolumeCount = -1 } + disk.UpAdjustDiskUsageDelta(deltaDiskUsages) } for _, v := range newVolumes { dn.doAddOrUpdateVolume(v) @@ -130,20 +128,47 @@ func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.Volu return } +func (dn *DataNode) AdjustMaxVolumeCounts(maxVolumeCounts map[string]uint32) { + deltaDiskUsages := newDiskUsages() + for diskType, maxVolumeCount := range maxVolumeCounts { + if maxVolumeCount == 0 { + // the volume server may have set the max to zero + continue + } + dt := types.ToDiskType(diskType) + currentDiskUsage := dn.diskUsages.getOrCreateDisk(dt) + if currentDiskUsage.maxVolumeCount == int64(maxVolumeCount) { + continue + } + disk := dn.getOrCreateDisk(dt.String()) + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(dt) + deltaDiskUsage.maxVolumeCount = int64(maxVolumeCount) - currentDiskUsage.maxVolumeCount + disk.UpAdjustDiskUsageDelta(deltaDiskUsages) + } +} + func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) { dn.RLock() - for _, v := range dn.volumes { - ret = append(ret, v) + for _, c := range dn.children { + disk := c.(*Disk) + ret = append(ret, disk.GetVolumes()...) } dn.RUnlock() return ret } -func (dn *DataNode) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, error) { +func (dn *DataNode) GetVolumesById(id needle.VolumeId) (vInfo storage.VolumeInfo, err error) { dn.RLock() defer dn.RUnlock() - vInfo, ok := dn.volumes[id] - if ok { + found := false + for _, c := range dn.children { + disk := c.(*Disk) + vInfo, found = disk.volumes[id] + if found { + break + } + } + if found { return vInfo, nil } else { return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found") @@ -181,29 +206,19 @@ func (dn *DataNode) Url() string { func (dn *DataNode) ToMap() interface{} { ret := make(map[string]interface{}) ret["Url"] = dn.Url() - ret["Volumes"] = dn.GetVolumeCount() - ret["VolumeIds"] = dn.GetVolumeIds() - ret["EcShards"] = dn.GetEcShardCount() - ret["Max"] = dn.GetMaxVolumeCount() - ret["Free"] = dn.FreeSpace() ret["PublicUrl"] = dn.PublicUrl + ret["Disks"] = dn.diskUsages.ToMap() return ret } func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo { m := &master_pb.DataNodeInfo{ - Id: string(dn.Id()), - VolumeCount: uint64(dn.GetVolumeCount()), - MaxVolumeCount: uint64(dn.GetMaxVolumeCount()), - FreeVolumeCount: uint64(dn.FreeSpace()), - ActiveVolumeCount: uint64(dn.GetActiveVolumeCount()), - RemoteVolumeCount: uint64(dn.GetRemoteVolumeCount()), + Id: string(dn.Id()), + DiskInfos: make(map[string]*master_pb.DiskInfo), } - for _, v := range dn.GetVolumes() { - m.VolumeInfos = append(m.VolumeInfos, v.ToVolumeInformationMessage()) - } - for _, ecv := range dn.GetEcShards() { - m.EcShardInfos = append(m.EcShardInfos, ecv.ToVolumeEcShardInformationMessage()) + for _, c := range dn.Children() { + disk := c.(*Disk) + m.DiskInfos[string(disk.Id())] = disk.ToDiskInfo() } return m } @@ -212,11 +227,21 @@ func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo { func (dn *DataNode) GetVolumeIds() string { dn.RLock() defer dn.RUnlock() - ids := make([]int, 0, len(dn.volumes)) + existingVolumes := dn.getVolumes() + ids := make([]int, 0, len(existingVolumes)) - for k := range dn.volumes { + for k := range existingVolumes { ids = append(ids, int(k)) } return util.HumanReadableIntsMax(100, ids...) } + +func (dn *DataNode) getVolumes() []storage.VolumeInfo { + var existingVolumes []storage.VolumeInfo + for _, c := range dn.children { + disk := c.(*Disk) + existingVolumes = append(existingVolumes, disk.GetVolumes()...) + } + return existingVolumes +} diff --git a/weed/topology/data_node_ec.go b/weed/topology/data_node_ec.go index 75c8784fe..df1b6d658 100644 --- a/weed/topology/data_node_ec.go +++ b/weed/topology/data_node_ec.go @@ -3,12 +3,14 @@ package topology import ( "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" ) func (dn *DataNode) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) { dn.RLock() - for _, ecVolumeInfo := range dn.ecShards { - ret = append(ret, ecVolumeInfo) + for _, c := range dn.children { + disk := c.(*Disk) + ret = append(ret, disk.GetEcShards()...) } dn.RUnlock() return ret @@ -21,10 +23,17 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) actualEcShardMap[ecShards.VolumeId] = ecShards } + existingEcShards := dn.GetEcShards() + // found out the newShards and deletedShards var newShardCount, deletedShardCount int - dn.ecShardsLock.RLock() - for vid, ecShards := range dn.ecShards { + for _, ecShards := range existingEcShards { + + disk := dn.getOrCreateDisk(ecShards.DiskType) + deltaDiskUsages := newDiskUsages() + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType)) + + vid := ecShards.VolumeId if actualEcShards, ok := actualEcShardMap[vid]; !ok { // dn registered ec shards not found in the new set of ec shards deletedShards = append(deletedShards, ecShards) @@ -42,26 +51,61 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) deletedShardCount += d.ShardIdCount() } } + + deltaDiskUsage.ecShardCount = int64(newShardCount - deletedShardCount) + disk.UpAdjustDiskUsageDelta(deltaDiskUsages) + } for _, ecShards := range actualShards { - if _, found := dn.ecShards[ecShards.VolumeId]; !found { + + disk := dn.getOrCreateDisk(ecShards.DiskType) + deltaDiskUsages := newDiskUsages() + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType)) + + if !dn.hasEcShards(ecShards.VolumeId) { newShards = append(newShards, ecShards) newShardCount += ecShards.ShardIdCount() } + + deltaDiskUsage.ecShardCount = int64(newShardCount) + disk.UpAdjustDiskUsageDelta(deltaDiskUsages) + } - dn.ecShardsLock.RUnlock() if len(newShards) > 0 || len(deletedShards) > 0 { // if changed, set to the new ec shard map - dn.ecShardsLock.Lock() - dn.ecShards = actualEcShardMap - dn.UpAdjustEcShardCountDelta(int64(newShardCount - deletedShardCount)) - dn.ecShardsLock.Unlock() + dn.doUpdateEcShards(actualShards) } return } +func (dn *DataNode) hasEcShards(volumeId needle.VolumeId) (found bool) { + dn.RLock() + defer dn.RUnlock() + for _, c := range dn.children { + disk := c.(*Disk) + _, found = disk.ecShards[volumeId] + if found { + return + } + } + return +} + +func (dn *DataNode) doUpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) { + dn.Lock() + for _, c := range dn.children { + disk := c.(*Disk) + disk.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo) + } + for _, shard := range actualShards { + disk := dn.getOrCreateDisk(shard.DiskType) + disk.ecShards[shard.VolumeId] = shard + } + dn.Unlock() +} + func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_coding.EcVolumeInfo) { for _, newShard := range newShards { @@ -75,61 +119,25 @@ func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_codi } func (dn *DataNode) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) { - dn.ecShardsLock.Lock() - defer dn.ecShardsLock.Unlock() - - delta := 0 - if existing, ok := dn.ecShards[s.VolumeId]; !ok { - dn.ecShards[s.VolumeId] = s - delta = s.ShardBits.ShardIdCount() - } else { - oldCount := existing.ShardBits.ShardIdCount() - existing.ShardBits = existing.ShardBits.Plus(s.ShardBits) - delta = existing.ShardBits.ShardIdCount() - oldCount - } - - dn.UpAdjustEcShardCountDelta(int64(delta)) - + disk := dn.getOrCreateDisk(s.DiskType) + disk.AddOrUpdateEcShard(s) } func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) { - dn.ecShardsLock.Lock() - defer dn.ecShardsLock.Unlock() - - if existing, ok := dn.ecShards[s.VolumeId]; ok { - oldCount := existing.ShardBits.ShardIdCount() - existing.ShardBits = existing.ShardBits.Minus(s.ShardBits) - delta := existing.ShardBits.ShardIdCount() - oldCount - dn.UpAdjustEcShardCountDelta(int64(delta)) - if existing.ShardBits.ShardIdCount() == 0 { - delete(dn.ecShards, s.VolumeId) - } - } - + disk := dn.getOrCreateDisk(s.DiskType) + disk.DeleteEcShard(s) } -func (dn *DataNode) HasVolumesById(id needle.VolumeId) (hasVolumeId bool) { +func (dn *DataNode) HasVolumesById(volumeId needle.VolumeId) (hasVolumeId bool) { - // check whether normal volumes has this volume id dn.RLock() - _, ok := dn.volumes[id] - if ok { - hasVolumeId = true - } - dn.RUnlock() - - if hasVolumeId { - return - } - - // check whether ec shards has this volume id - dn.ecShardsLock.RLock() - _, ok = dn.ecShards[id] - if ok { - hasVolumeId = true + defer dn.RUnlock() + for _, c := range dn.children { + disk := c.(*Disk) + if disk.HasVolumesById(volumeId) { + return true + } } - dn.ecShardsLock.RUnlock() - - return + return false } diff --git a/weed/topology/disk.go b/weed/topology/disk.go new file mode 100644 index 000000000..37d5e1272 --- /dev/null +++ b/weed/topology/disk.go @@ -0,0 +1,289 @@ +package topology + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" + "sync" + + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + + "github.com/chrislusf/seaweedfs/weed/storage" +) + +type Disk struct { + NodeImpl + volumes map[needle.VolumeId]storage.VolumeInfo + ecShards map[needle.VolumeId]*erasure_coding.EcVolumeInfo + ecShardsLock sync.RWMutex +} + +func NewDisk(diskType string) *Disk { + s := &Disk{} + s.id = NodeId(diskType) + s.nodeType = "Disk" + s.diskUsages = newDiskUsages() + s.volumes = make(map[needle.VolumeId]storage.VolumeInfo, 2) + s.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo, 2) + s.NodeImpl.value = s + return s +} + +type DiskUsages struct { + sync.RWMutex + usages map[types.DiskType]*DiskUsageCounts +} + +func newDiskUsages() *DiskUsages { + return &DiskUsages{ + usages: make(map[types.DiskType]*DiskUsageCounts), + } +} + +func (d *DiskUsages) negative() *DiskUsages { + d.RLock() + defer d.RUnlock() + t := newDiskUsages() + for diskType, b := range d.usages { + a := t.getOrCreateDisk(diskType) + a.volumeCount = -b.volumeCount + a.remoteVolumeCount = -b.remoteVolumeCount + a.activeVolumeCount = -b.activeVolumeCount + a.ecShardCount = -b.ecShardCount + a.maxVolumeCount = -b.maxVolumeCount + + } + return t +} + +func (d *DiskUsages) ToMap() interface{} { + d.RLock() + defer d.RUnlock() + ret := make(map[string]interface{}) + for diskType, diskUsage := range d.usages { + ret[diskType.String()] = diskUsage.ToMap() + } + return ret +} + +func (d *DiskUsages) ToDiskInfo() map[string]*master_pb.DiskInfo { + ret := make(map[string]*master_pb.DiskInfo) + for diskType, diskUsageCounts := range d.usages { + m := &master_pb.DiskInfo{ + VolumeCount: uint64(diskUsageCounts.volumeCount), + MaxVolumeCount: uint64(diskUsageCounts.maxVolumeCount), + FreeVolumeCount: uint64(diskUsageCounts.maxVolumeCount - diskUsageCounts.volumeCount), + ActiveVolumeCount: uint64(diskUsageCounts.activeVolumeCount), + RemoteVolumeCount: uint64(diskUsageCounts.remoteVolumeCount), + } + ret[string(diskType)] = m + } + return ret +} + +func (d *DiskUsages) FreeSpace() (freeSpace int64) { + d.RLock() + defer d.RUnlock() + for _, diskUsage := range d.usages { + freeSpace += diskUsage.FreeSpace() + } + return +} + +func (d *DiskUsages) GetMaxVolumeCount() (maxVolumeCount int64) { + d.RLock() + defer d.RUnlock() + for _, diskUsage := range d.usages { + maxVolumeCount += diskUsage.maxVolumeCount + } + return +} + +type DiskUsageCounts struct { + volumeCount int64 + remoteVolumeCount int64 + activeVolumeCount int64 + ecShardCount int64 + maxVolumeCount int64 +} + +func (a *DiskUsageCounts) addDiskUsageCounts(b *DiskUsageCounts) { + a.volumeCount += b.volumeCount + a.remoteVolumeCount += b.remoteVolumeCount + a.activeVolumeCount += b.activeVolumeCount + a.ecShardCount += b.ecShardCount + a.maxVolumeCount += b.maxVolumeCount +} + +func (a *DiskUsageCounts) FreeSpace() int64 { + freeVolumeSlotCount := a.maxVolumeCount + a.remoteVolumeCount - a.volumeCount + if a.ecShardCount > 0 { + freeVolumeSlotCount = freeVolumeSlotCount - a.ecShardCount/erasure_coding.DataShardsCount - 1 + } + return freeVolumeSlotCount +} + +func (a *DiskUsageCounts) minus(b *DiskUsageCounts) *DiskUsageCounts { + return &DiskUsageCounts{ + volumeCount: a.volumeCount - b.volumeCount, + remoteVolumeCount: a.remoteVolumeCount - b.remoteVolumeCount, + activeVolumeCount: a.activeVolumeCount - b.activeVolumeCount, + ecShardCount: a.ecShardCount - b.ecShardCount, + maxVolumeCount: a.maxVolumeCount - b.maxVolumeCount, + } +} + +func (diskUsage *DiskUsageCounts) ToMap() interface{} { + ret := make(map[string]interface{}) + ret["Volumes"] = diskUsage.volumeCount + ret["EcShards"] = diskUsage.ecShardCount + ret["Max"] = diskUsage.maxVolumeCount + ret["Free"] = diskUsage.FreeSpace() + return ret +} + +func (du *DiskUsages) getOrCreateDisk(diskType types.DiskType) *DiskUsageCounts { + du.Lock() + defer du.Unlock() + t, found := du.usages[diskType] + if found { + return t + } + t = &DiskUsageCounts{} + du.usages[diskType] = t + return t +} + +func (d *Disk) String() string { + d.RLock() + defer d.RUnlock() + return fmt.Sprintf("Disk:%s, volumes:%v, ecShards:%v", d.NodeImpl.String(), d.volumes, d.ecShards) +} + +func (d *Disk) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) { + d.Lock() + defer d.Unlock() + return d.doAddOrUpdateVolume(v) +} + +func (d *Disk) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) { + deltaDiskUsages := newDiskUsages() + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType)) + if oldV, ok := d.volumes[v.Id]; !ok { + d.volumes[v.Id] = v + deltaDiskUsage.volumeCount = 1 + if v.IsRemote() { + deltaDiskUsage.remoteVolumeCount = 1 + } + if !v.ReadOnly { + deltaDiskUsage.activeVolumeCount = 1 + } + d.UpAdjustMaxVolumeId(v.Id) + d.UpAdjustDiskUsageDelta(deltaDiskUsages) + isNew = true + } else { + if oldV.IsRemote() != v.IsRemote() { + if v.IsRemote() { + deltaDiskUsage.remoteVolumeCount = 1 + } + if oldV.IsRemote() { + deltaDiskUsage.remoteVolumeCount = -1 + } + d.UpAdjustDiskUsageDelta(deltaDiskUsages) + } + isChangedRO = d.volumes[v.Id].ReadOnly != v.ReadOnly + d.volumes[v.Id] = v + } + return +} + +func (d *Disk) GetVolumes() (ret []storage.VolumeInfo) { + d.RLock() + for _, v := range d.volumes { + ret = append(ret, v) + } + d.RUnlock() + return ret +} + +func (d *Disk) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, error) { + d.RLock() + defer d.RUnlock() + vInfo, ok := d.volumes[id] + if ok { + return vInfo, nil + } else { + return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found") + } +} + +func (d *Disk) GetDataCenter() *DataCenter { + dn := d.Parent() + rack := dn.Parent() + dcNode := rack.Parent() + dcValue := dcNode.GetValue() + return dcValue.(*DataCenter) +} + +func (d *Disk) GetRack() *Rack { + return d.Parent().Parent().(*NodeImpl).value.(*Rack) +} + +func (d *Disk) GetTopology() *Topology { + p := d.Parent() + for p.Parent() != nil { + p = p.Parent() + } + t := p.(*Topology) + return t +} + +func (d *Disk) ToMap() interface{} { + ret := make(map[string]interface{}) + diskUsage := d.diskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id()))) + ret["Volumes"] = diskUsage.volumeCount + ret["VolumeIds"] = d.GetVolumeIds() + ret["EcShards"] = diskUsage.ecShardCount + ret["Max"] = diskUsage.maxVolumeCount + ret["Free"] = d.FreeSpace() + return ret +} + +func (d *Disk) FreeSpace() int64 { + t := d.diskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id()))) + return t.FreeSpace() +} + +func (d *Disk) ToDiskInfo() *master_pb.DiskInfo { + diskUsage := d.diskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id()))) + m := &master_pb.DiskInfo{ + Type: string(d.Id()), + VolumeCount: uint64(diskUsage.volumeCount), + MaxVolumeCount: uint64(diskUsage.maxVolumeCount), + FreeVolumeCount: uint64(diskUsage.maxVolumeCount - diskUsage.volumeCount), + ActiveVolumeCount: uint64(diskUsage.activeVolumeCount), + RemoteVolumeCount: uint64(diskUsage.remoteVolumeCount), + } + for _, v := range d.GetVolumes() { + m.VolumeInfos = append(m.VolumeInfos, v.ToVolumeInformationMessage()) + } + for _, ecv := range d.GetEcShards() { + m.EcShardInfos = append(m.EcShardInfos, ecv.ToVolumeEcShardInformationMessage()) + } + return m +} + +// GetVolumeIds returns the human readable volume ids limited to count of max 100. +func (d *Disk) GetVolumeIds() string { + d.RLock() + defer d.RUnlock() + ids := make([]int, 0, len(d.volumes)) + + for k := range d.volumes { + ids = append(ids, int(k)) + } + + return util.HumanReadableIntsMax(100, ids...) +} diff --git a/weed/topology/disk_ec.go b/weed/topology/disk_ec.go new file mode 100644 index 000000000..74a06b47f --- /dev/null +++ b/weed/topology/disk_ec.go @@ -0,0 +1,84 @@ +package topology + +import ( + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +func (d *Disk) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) { + d.RLock() + for _, ecVolumeInfo := range d.ecShards { + ret = append(ret, ecVolumeInfo) + } + d.RUnlock() + return ret +} + +func (d *Disk) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) { + d.ecShardsLock.Lock() + defer d.ecShardsLock.Unlock() + + delta := 0 + if existing, ok := d.ecShards[s.VolumeId]; !ok { + d.ecShards[s.VolumeId] = s + delta = s.ShardBits.ShardIdCount() + } else { + oldCount := existing.ShardBits.ShardIdCount() + existing.ShardBits = existing.ShardBits.Plus(s.ShardBits) + delta = existing.ShardBits.ShardIdCount() - oldCount + } + + deltaDiskUsages := newDiskUsages() + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id()))) + deltaDiskUsage.ecShardCount = int64(delta) + d.UpAdjustDiskUsageDelta(deltaDiskUsages) + +} + +func (d *Disk) DeleteEcShard(s *erasure_coding.EcVolumeInfo) { + d.ecShardsLock.Lock() + defer d.ecShardsLock.Unlock() + + if existing, ok := d.ecShards[s.VolumeId]; ok { + oldCount := existing.ShardBits.ShardIdCount() + existing.ShardBits = existing.ShardBits.Minus(s.ShardBits) + delta := existing.ShardBits.ShardIdCount() - oldCount + + deltaDiskUsages := newDiskUsages() + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id()))) + deltaDiskUsage.ecShardCount = int64(delta) + d.UpAdjustDiskUsageDelta(deltaDiskUsages) + + if existing.ShardBits.ShardIdCount() == 0 { + delete(d.ecShards, s.VolumeId) + } + } + +} + +func (d *Disk) HasVolumesById(id needle.VolumeId) (hasVolumeId bool) { + + // check whether normal volumes has this volume id + d.RLock() + _, ok := d.volumes[id] + if ok { + hasVolumeId = true + } + d.RUnlock() + + if hasVolumeId { + return + } + + // check whether ec shards has this volume id + d.ecShardsLock.RLock() + _, ok = d.ecShards[id] + if ok { + hasVolumeId = true + } + d.ecShardsLock.RUnlock() + + return + +} diff --git a/weed/topology/node.go b/weed/topology/node.go index 114417edf..95d63972e 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -2,34 +2,25 @@ package topology import ( "errors" - "math/rand" - "strings" - "sync" - "sync/atomic" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "math/rand" + "strings" + "sync" ) type NodeId string type Node interface { Id() NodeId String() string - FreeSpace() int64 - ReserveOneVolume(r int64) (*DataNode, error) - UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) - UpAdjustVolumeCountDelta(volumeCountDelta int64) - UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) - UpAdjustEcShardCountDelta(ecShardCountDelta int64) - UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) + AvailableSpaceFor(option *VolumeGrowOption) int64 + ReserveOneVolume(r int64, option *VolumeGrowOption) (*DataNode, error) + UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages) UpAdjustMaxVolumeId(vid needle.VolumeId) + GetDiskUsages() *DiskUsages - GetVolumeCount() int64 - GetEcShardCount() int64 - GetActiveVolumeCount() int64 - GetRemoteVolumeCount() int64 - GetMaxVolumeCount() int64 GetMaxVolumeId() needle.VolumeId SetParent(Node) LinkChildNode(node Node) @@ -45,24 +36,24 @@ type Node interface { GetValue() interface{} //get reference to the topology,dc,rack,datanode } type NodeImpl struct { - volumeCount int64 - remoteVolumeCount int64 - activeVolumeCount int64 - ecShardCount int64 - maxVolumeCount int64 - id NodeId - parent Node - sync.RWMutex // lock children - children map[NodeId]Node - maxVolumeId needle.VolumeId + diskUsages *DiskUsages + id NodeId + parent Node + sync.RWMutex // lock children + children map[NodeId]Node + maxVolumeId needle.VolumeId //for rack, data center, topology nodeType string value interface{} } +func (n *NodeImpl) GetDiskUsages() *DiskUsages { + return n.diskUsages +} + // the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot -func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) { +func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, option *VolumeGrowOption, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) { var totalWeights int64 var errs []string n.RLock() @@ -70,12 +61,12 @@ func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, filterFirstNodeFn func(d candidatesWeights := make([]int64, 0, len(n.children)) //pick nodes which has enough free volumes as candidates, and use free volumes number as node weight. for _, node := range n.children { - if node.FreeSpace() <= 0 { + if node.AvailableSpaceFor(option) <= 0 { continue } - totalWeights += node.FreeSpace() + totalWeights += node.AvailableSpaceFor(option) candidates = append(candidates, node) - candidatesWeights = append(candidatesWeights, node.FreeSpace()) + candidatesWeights = append(candidatesWeights, node.AvailableSpaceFor(option)) } n.RUnlock() if len(candidates) < numberOfNodes { @@ -142,10 +133,14 @@ func (n *NodeImpl) String() string { func (n *NodeImpl) Id() NodeId { return n.id } -func (n *NodeImpl) FreeSpace() int64 { - freeVolumeSlotCount := n.maxVolumeCount + n.remoteVolumeCount - n.volumeCount - if n.ecShardCount > 0 { - freeVolumeSlotCount = freeVolumeSlotCount - n.ecShardCount/erasure_coding.DataShardsCount - 1 +func (n *NodeImpl) getOrCreateDisk(diskType types.DiskType) *DiskUsageCounts { + return n.diskUsages.getOrCreateDisk(diskType) +} +func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 { + t := n.getOrCreateDisk(option.DiskType) + freeVolumeSlotCount := t.maxVolumeCount + t.remoteVolumeCount - t.volumeCount + if t.ecShardCount > 0 { + freeVolumeSlotCount = freeVolumeSlotCount - t.ecShardCount/erasure_coding.DataShardsCount - 1 } return freeVolumeSlotCount } @@ -166,11 +161,11 @@ func (n *NodeImpl) Parent() Node { func (n *NodeImpl) GetValue() interface{} { return n.value } -func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) { +func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) { n.RLock() defer n.RUnlock() for _, node := range n.children { - freeSpace := node.FreeSpace() + freeSpace := node.AvailableSpaceFor(option) // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) if freeSpace <= 0 { continue @@ -178,11 +173,11 @@ func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) if r >= freeSpace { r -= freeSpace } else { - if node.IsDataNode() && node.FreeSpace() > 0 { + if node.IsDataNode() && node.AvailableSpaceFor(option) > 0 { // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) return node.(*DataNode), nil } - assignedNode, err = node.ReserveOneVolume(r) + assignedNode, err = node.ReserveOneVolume(r, option) if err == nil { return } @@ -191,49 +186,13 @@ func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) return nil, errors.New("No free volume slot found!") } -func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //can be negative - if maxVolumeCountDelta == 0 { - return - } - atomic.AddInt64(&n.maxVolumeCount, maxVolumeCountDelta) - if n.parent != nil { - n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta) - } -} -func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative - if volumeCountDelta == 0 { - return +func (n *NodeImpl) UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages) { //can be negative + for diskType, diskUsage := range deltaDiskUsages.usages { + existingDisk := n.getOrCreateDisk(diskType) + existingDisk.addDiskUsageCounts(diskUsage) } - atomic.AddInt64(&n.volumeCount, volumeCountDelta) if n.parent != nil { - n.parent.UpAdjustVolumeCountDelta(volumeCountDelta) - } -} -func (n *NodeImpl) UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) { //can be negative - if remoteVolumeCountDelta == 0 { - return - } - atomic.AddInt64(&n.remoteVolumeCount, remoteVolumeCountDelta) - if n.parent != nil { - n.parent.UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta) - } -} -func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative - if ecShardCountDelta == 0 { - return - } - atomic.AddInt64(&n.ecShardCount, ecShardCountDelta) - if n.parent != nil { - n.parent.UpAdjustEcShardCountDelta(ecShardCountDelta) - } -} -func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative - if activeVolumeCountDelta == 0 { - return - } - atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta) - if n.parent != nil { - n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta) + n.parent.UpAdjustDiskUsageDelta(deltaDiskUsages) } } func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative @@ -247,33 +206,18 @@ func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId { return n.maxVolumeId } -func (n *NodeImpl) GetVolumeCount() int64 { - return n.volumeCount -} -func (n *NodeImpl) GetEcShardCount() int64 { - return n.ecShardCount -} -func (n *NodeImpl) GetRemoteVolumeCount() int64 { - return n.remoteVolumeCount -} -func (n *NodeImpl) GetActiveVolumeCount() int64 { - return n.activeVolumeCount -} -func (n *NodeImpl) GetMaxVolumeCount() int64 { - return n.maxVolumeCount -} func (n *NodeImpl) LinkChildNode(node Node) { n.Lock() defer n.Unlock() + n.doLinkChildNode(node) +} + +func (n *NodeImpl) doLinkChildNode(node Node) { if n.children[node.Id()] == nil { n.children[node.Id()] = node - n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount()) + n.UpAdjustDiskUsageDelta(node.GetDiskUsages()) n.UpAdjustMaxVolumeId(node.GetMaxVolumeId()) - n.UpAdjustVolumeCountDelta(node.GetVolumeCount()) - n.UpAdjustRemoteVolumeCountDelta(node.GetRemoteVolumeCount()) - n.UpAdjustEcShardCountDelta(node.GetEcShardCount()) - n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount()) node.SetParent(n) glog.V(0).Infoln(n, "adds child", node.Id()) } @@ -286,11 +230,7 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) { if node != nil { node.SetParent(nil) delete(n.children, node.Id()) - n.UpAdjustVolumeCountDelta(-node.GetVolumeCount()) - n.UpAdjustRemoteVolumeCountDelta(-node.GetRemoteVolumeCount()) - n.UpAdjustEcShardCountDelta(-node.GetEcShardCount()) - n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount()) - n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount()) + n.UpAdjustDiskUsageDelta(node.GetDiskUsages().negative()) glog.V(0).Infoln(n, "removes", node.Id()) } } diff --git a/weed/topology/rack.go b/weed/topology/rack.go index 1921c0c05..8eb2a717c 100644 --- a/weed/topology/rack.go +++ b/weed/topology/rack.go @@ -2,6 +2,7 @@ package topology import ( "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/types" "strconv" "time" ) @@ -14,6 +15,7 @@ func NewRack(id string) *Rack { r := &Rack{} r.id = NodeId(id) r.nodeType = "Rack" + r.diskUsages = newDiskUsages() r.children = make(map[NodeId]Node) r.NodeImpl.value = r return r @@ -28,7 +30,7 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode { } return nil } -func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int64) *DataNode { +func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCounts map[string]uint32) *DataNode { for _, c := range r.Children() { dn := c.(*DataNode) if dn.MatchLocation(ip, port) { @@ -40,17 +42,19 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol dn.Ip = ip dn.Port = port dn.PublicUrl = publicUrl - dn.maxVolumeCount = maxVolumeCount dn.LastSeen = time.Now().Unix() r.LinkChildNode(dn) + for diskType, maxVolumeCount := range maxVolumeCounts { + disk := NewDisk(diskType) + disk.diskUsages.getOrCreateDisk(types.ToDiskType(diskType)).maxVolumeCount = int64(maxVolumeCount) + dn.LinkChildNode(disk) + } return dn } func (r *Rack) ToMap() interface{} { m := make(map[string]interface{}) m["Id"] = r.Id() - m["Max"] = r.GetMaxVolumeCount() - m["Free"] = r.FreeSpace() var dns []interface{} for _, c := range r.Children() { dn := c.(*DataNode) @@ -62,12 +66,8 @@ func (r *Rack) ToMap() interface{} { func (r *Rack) ToRackInfo() *master_pb.RackInfo { m := &master_pb.RackInfo{ - Id: string(r.Id()), - VolumeCount: uint64(r.GetVolumeCount()), - MaxVolumeCount: uint64(r.GetMaxVolumeCount()), - FreeVolumeCount: uint64(r.FreeSpace()), - ActiveVolumeCount: uint64(r.GetActiveVolumeCount()), - RemoteVolumeCount: uint64(r.GetRemoteVolumeCount()), + Id: string(r.Id()), + DiskInfos: r.diskUsages.ToDiskInfo(), } for _, c := range r.Children() { dn := c.(*DataNode) diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index 6b4076913..ea0a8c968 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -18,7 +18,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func ReplicatedWrite(masterNode string, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (isUnchanged bool, err error) { +func ReplicatedWrite(masterFn operation.GetMasterFn, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (isUnchanged bool, err error) { //check JWT jwt := security.GetJwt(r) @@ -27,7 +27,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId needle.Volume var remoteLocations []operation.Location if r.FormValue("type") != "replicate" { // this is the initial request - remoteLocations, err = getWritableRemoteReplications(s, volumeId, masterNode) + remoteLocations, err = getWritableRemoteReplications(s, volumeId, masterFn) if err != nil { glog.V(0).Infoln(err) return @@ -92,7 +92,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId needle.Volume return } -func ReplicatedDelete(masterNode string, store *storage.Store, +func ReplicatedDelete(masterFn operation.GetMasterFn, store *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (size types.Size, err error) { @@ -101,7 +101,7 @@ func ReplicatedDelete(masterNode string, store *storage.Store, var remoteLocations []operation.Location if r.FormValue("type") != "replicate" { - remoteLocations, err = getWritableRemoteReplications(store, volumeId, masterNode) + remoteLocations, err = getWritableRemoteReplications(store, volumeId, masterFn) if err != nil { glog.V(0).Infoln(err) return @@ -161,7 +161,7 @@ func distributedOperation(locations []operation.Location, store *storage.Store, return ret.Error() } -func getWritableRemoteReplications(s *storage.Store, volumeId needle.VolumeId, masterNode string) ( +func getWritableRemoteReplications(s *storage.Store, volumeId needle.VolumeId, masterFn operation.GetMasterFn) ( remoteLocations []operation.Location, err error) { v := s.GetVolume(volumeId) @@ -170,7 +170,7 @@ func getWritableRemoteReplications(s *storage.Store, volumeId needle.VolumeId, m } // not on local store, or has replications - lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()) + lookupResult, lookupErr := operation.Lookup(masterFn, volumeId.String()) if lookupErr == nil { selfUrl := s.Ip + ":" + strconv.Itoa(s.Port) for _, location := range lookupResult.Locations { diff --git a/weed/topology/topology.go b/weed/topology/topology.go index bde72cf09..08ebd24fd 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -3,6 +3,7 @@ package topology import ( "errors" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "math/rand" "sync" "time" @@ -45,6 +46,7 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls t.id = NodeId(id) t.nodeType = "Topology" t.NodeImpl.value = t + t.diskUsages = newDiskUsages() t.children = make(map[NodeId]Node) t.collectionMap = util.NewConcurrentReadMap() t.ecShardMap = make(map[needle.VolumeId]*EcShardLocations) @@ -121,12 +123,12 @@ func (t *Topology) NextVolumeId() (needle.VolumeId, error) { } func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool { - vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) + vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType) return vl.GetActiveVolumeCount(option) > 0 } func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) { - vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option) + vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType).PickForWrite(count, option) if err != nil { return "", 0, nil, fmt.Errorf("failed to find writable volumes for collection:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err) } @@ -137,10 +139,10 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil } -func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout { +func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) *VolumeLayout { return t.collectionMap.Get(collectionName, func() interface{} { return NewCollection(collectionName, t.volumeSizeLimit, t.replicationAsMin) - }).(*Collection).GetOrCreateVolumeLayout(rp, ttl) + }).(*Collection).GetOrCreateVolumeLayout(rp, ttl, diskType) } func (t *Topology) ListCollections(includeNormalVolumes, includeEcVolumes bool) (ret []string) { @@ -176,17 +178,30 @@ func (t *Topology) DeleteCollection(collectionName string) { t.collectionMap.Delete(collectionName) } +func (t *Topology) DeleteLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) { + collection, found := t.FindCollection(collectionName) + if !found { + return + } + collection.DeleteVolumeLayout(rp, ttl, diskType) + if len(collection.storageType2VolumeLayout.Items()) == 0 { + t.DeleteCollection(collectionName) + } +} + func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { - vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) + diskType := types.ToDiskType(v.DiskType) + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) vl.RegisterVolume(&v, dn) vl.EnsureCorrectWritables(&v) } func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { - glog.Infof("removing volume info:%+v", v) - volumeLayout := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) + glog.Infof("removing volume info: %+v", v) + diskType := types.ToDiskType(v.DiskType) + volumeLayout := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) volumeLayout.UnRegisterVolume(&v, dn) if volumeLayout.isEmpty() { - t.DeleteCollection(v.Collection) + t.DeleteLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) } } @@ -222,7 +237,8 @@ func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformati t.UnRegisterVolumeLayout(v, dn) } for _, v := range changedVolumes { - vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) + diskType := types.ToDiskType(v.DiskType) + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) vl.EnsureCorrectWritables(&v) } return diff --git a/weed/topology/topology_ec.go b/weed/topology/topology_ec.go index 93b39bb5d..022eeb578 100644 --- a/weed/topology/topology_ec.go +++ b/weed/topology/topology_ec.go @@ -18,6 +18,7 @@ func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInf for _, shardInfo := range shardInfos { shards = append(shards, erasure_coding.NewEcVolumeInfo( + shardInfo.DiskType, shardInfo.Collection, needle.VolumeId(shardInfo.Id), erasure_coding.ShardBits(shardInfo.EcIndexBits))) @@ -39,6 +40,7 @@ func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards for _, shardInfo := range newEcShards { newShards = append(newShards, erasure_coding.NewEcVolumeInfo( + shardInfo.DiskType, shardInfo.Collection, needle.VolumeId(shardInfo.Id), erasure_coding.ShardBits(shardInfo.EcIndexBits))) @@ -46,6 +48,7 @@ func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards for _, shardInfo := range deletedEcShards { deletedShards = append(deletedShards, erasure_coding.NewEcVolumeInfo( + shardInfo.DiskType, shardInfo.Collection, needle.VolumeId(shardInfo.Id), erasure_coding.ShardBits(shardInfo.EcIndexBits))) diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go index 068bd401e..543dacf29 100644 --- a/weed/topology/topology_event_handling.go +++ b/weed/topology/topology_event_handling.go @@ -1,6 +1,7 @@ package topology import ( + "github.com/chrislusf/seaweedfs/weed/storage/types" "google.golang.org/grpc" "math/rand" "time" @@ -37,7 +38,8 @@ func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, g }() } func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { - vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl) + diskType := types.ToDiskType(volumeInfo.DiskType) + vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl, diskType) if !vl.SetVolumeCapacityFull(volumeInfo.Id) { return false } @@ -47,7 +49,13 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { for _, dn := range vl.vid2location[volumeInfo.Id].list { if !volumeInfo.ReadOnly { - dn.UpAdjustActiveVolumeCountDelta(-1) + + disk := dn.getOrCreateDisk(volumeInfo.DiskType) + deltaDiskUsages := newDiskUsages() + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(volumeInfo.DiskType)) + deltaDiskUsage.activeVolumeCount = -1 + disk.UpAdjustDiskUsageDelta(deltaDiskUsages) + } } return true @@ -55,13 +63,14 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { func (t *Topology) UnRegisterDataNode(dn *DataNode) { for _, v := range dn.GetVolumes() { glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn.Id()) - vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) + diskType := types.ToDiskType(v.DiskType) + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) vl.SetVolumeUnavailable(dn, v.Id) } - dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) - dn.UpAdjustRemoteVolumeCountDelta(-dn.GetRemoteVolumeCount()) - dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount()) - dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount()) + + negativeUsages := dn.GetDiskUsages().negative() + dn.UpAdjustDiskUsageDelta(negativeUsages) + if dn.Parent() != nil { dn.Parent().UnlinkChildNode(dn.Id()) } diff --git a/weed/topology/topology_map.go b/weed/topology/topology_map.go index 73c55d77d..0fedb6221 100644 --- a/weed/topology/topology_map.go +++ b/weed/topology/topology_map.go @@ -4,8 +4,8 @@ import "github.com/chrislusf/seaweedfs/weed/pb/master_pb" func (t *Topology) ToMap() interface{} { m := make(map[string]interface{}) - m["Max"] = t.GetMaxVolumeCount() - m["Free"] = t.FreeSpace() + m["Max"] = t.diskUsages.GetMaxVolumeCount() + m["Free"] = t.diskUsages.FreeSpace() var dcs []interface{} for _, c := range t.Children() { dc := c.(*DataCenter) @@ -29,8 +29,8 @@ func (t *Topology) ToMap() interface{} { func (t *Topology) ToVolumeMap() interface{} { m := make(map[string]interface{}) - m["Max"] = t.GetMaxVolumeCount() - m["Free"] = t.FreeSpace() + m["Max"] = t.diskUsages.GetMaxVolumeCount() + m["Free"] = t.diskUsages.FreeSpace() dcs := make(map[NodeId]interface{}) for _, c := range t.Children() { dc := c.(*DataCenter) @@ -80,12 +80,8 @@ func (t *Topology) ToVolumeLocations() (volumeLocations []*master_pb.VolumeLocat func (t *Topology) ToTopologyInfo() *master_pb.TopologyInfo { m := &master_pb.TopologyInfo{ - Id: string(t.Id()), - VolumeCount: uint64(t.GetVolumeCount()), - MaxVolumeCount: uint64(t.GetMaxVolumeCount()), - FreeVolumeCount: uint64(t.FreeSpace()), - ActiveVolumeCount: uint64(t.GetActiveVolumeCount()), - RemoteVolumeCount: uint64(t.GetRemoteVolumeCount()), + Id: string(t.Id()), + DiskInfos: t.diskUsages.ToDiskInfo(), } for _, c := range t.Children() { dc := c.(*DataCenter) diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go index 2fe381ca2..ecfe9d8d1 100644 --- a/weed/topology/topology_test.go +++ b/weed/topology/topology_test.go @@ -6,6 +6,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/super_block" + "github.com/chrislusf/seaweedfs/weed/storage/types" "testing" ) @@ -13,11 +14,11 @@ import ( func TestRemoveDataCenter(t *testing.T) { topo := setup(topologyLayout) topo.UnlinkChildNode(NodeId("dc2")) - if topo.GetActiveVolumeCount() != 15 { + if topo.diskUsages.usages[types.HardDriveType].activeVolumeCount != 15 { t.Fail() } topo.UnlinkChildNode(NodeId("dc3")) - if topo.GetActiveVolumeCount() != 12 { + if topo.diskUsages.usages[types.HardDriveType].activeVolumeCount != 12 { t.Fail() } } @@ -27,7 +28,10 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) { dc := topo.GetOrCreateDataCenter("dc1") rack := dc.GetOrCreateRack("rack1") - dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", 25) + maxVolumeCounts := make(map[string]uint32) + maxVolumeCounts[""] = 25 + maxVolumeCounts["ssd"] = 12 + dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", maxVolumeCounts) { volumeCount := 7 @@ -48,10 +52,30 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) { volumeMessages = append(volumeMessages, volumeMessage) } + for k := 1; k <= volumeCount; k++ { + volumeMessage := &master_pb.VolumeInformationMessage{ + Id: uint32(volumeCount + k), + Size: uint64(25432), + Collection: "", + FileCount: uint64(2343), + DeleteCount: uint64(345), + DeletedByteCount: 34524, + ReadOnly: false, + ReplicaPlacement: uint32(0), + Version: uint32(needle.CurrentVersion), + Ttl: 0, + DiskType: "ssd", + } + volumeMessages = append(volumeMessages, volumeMessage) + } + topo.SyncDataNodeRegistration(volumeMessages, dn) - assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount) - assert(t, "volumeCount", int(topo.volumeCount), volumeCount) + usageCounts := topo.diskUsages.usages[types.HardDriveType] + + assert(t, "activeVolumeCount1", int(usageCounts.activeVolumeCount), volumeCount) + assert(t, "volumeCount", int(usageCounts.volumeCount), volumeCount) + assert(t, "ssdVolumeCount", int(topo.diskUsages.usages[types.SsdType].volumeCount), volumeCount) } { @@ -78,8 +102,10 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) { //layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL) //assert(t, "writables", len(layout.writables), volumeCount) - assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount) - assert(t, "volumeCount", int(topo.volumeCount), volumeCount) + usageCounts := topo.diskUsages.usages[types.HardDriveType] + + assert(t, "activeVolumeCount1", int(usageCounts.activeVolumeCount), volumeCount) + assert(t, "volumeCount", int(usageCounts.volumeCount), volumeCount) } { @@ -96,26 +122,28 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) { nil, dn) rp, _ := super_block.NewReplicaPlacementFromString("000") - layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL) + layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL, types.HardDriveType) assert(t, "writables after repeated add", len(layout.writables), volumeCount) - assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount) - assert(t, "volumeCount", int(topo.volumeCount), volumeCount) + usageCounts := topo.diskUsages.usages[types.HardDriveType] + + assert(t, "activeVolumeCount1", int(usageCounts.activeVolumeCount), volumeCount) + assert(t, "volumeCount", int(usageCounts.volumeCount), volumeCount) topo.IncrementalSyncDataNodeRegistration( nil, []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage}, dn) assert(t, "writables after deletion", len(layout.writables), volumeCount-1) - assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount-1) - assert(t, "volumeCount", int(topo.volumeCount), volumeCount-1) + assert(t, "activeVolumeCount1", int(usageCounts.activeVolumeCount), volumeCount-1) + assert(t, "volumeCount", int(usageCounts.volumeCount), volumeCount-1) topo.IncrementalSyncDataNodeRegistration( []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage}, nil, dn) - for vid, _ := range layout.vid2location { + for vid := range layout.vid2location { println("after add volume id", vid) } for _, vid := range layout.writables { @@ -128,7 +156,9 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) { topo.UnRegisterDataNode(dn) - assert(t, "activeVolumeCount2", int(topo.activeVolumeCount), 0) + usageCounts := topo.diskUsages.usages[types.HardDriveType] + + assert(t, "activeVolumeCount2", int(usageCounts.activeVolumeCount), 0) } @@ -144,12 +174,16 @@ func TestAddRemoveVolume(t *testing.T) { dc := topo.GetOrCreateDataCenter("dc1") rack := dc.GetOrCreateRack("rack1") - dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", 25) + maxVolumeCounts := make(map[string]uint32) + maxVolumeCounts[""] = 25 + maxVolumeCounts["ssd"] = 12 + dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", maxVolumeCounts) v := storage.VolumeInfo{ Id: needle.VolumeId(1), Size: 100, Collection: "xcollection", + DiskType: "ssd", FileCount: 123, DeleteCount: 23, DeletedByteCount: 45, diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go index 58b5702bf..4b0a4837e 100644 --- a/weed/topology/volume_growth.go +++ b/weed/topology/volume_growth.go @@ -2,6 +2,7 @@ package topology import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "math/rand" "sync" @@ -27,6 +28,7 @@ type VolumeGrowOption struct { Collection string ReplicaPlacement *super_block.ReplicaPlacement Ttl *needle.TTL + DiskType types.DiskType Prealloacte int64 DataCenter string Rack string @@ -113,21 +115,21 @@ func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topolo func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) { //find main datacenter and other data centers rp := option.ReplicaPlacement - mainDataCenter, otherDataCenters, dc_err := topo.PickNodesByWeight(rp.DiffDataCenterCount+1, func(node Node) error { + mainDataCenter, otherDataCenters, dc_err := topo.PickNodesByWeight(rp.DiffDataCenterCount+1, option, func(node Node) error { if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) { return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter) } if len(node.Children()) < rp.DiffRackCount+1 { return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1) } - if node.FreeSpace() < int64(rp.DiffRackCount+rp.SameRackCount+1) { - return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1) + if node.AvailableSpaceFor(option) < int64(rp.DiffRackCount+rp.SameRackCount+1) { + return fmt.Errorf("Free:%d < Expected:%d", node.AvailableSpaceFor(option), rp.DiffRackCount+rp.SameRackCount+1) } possibleRacksCount := 0 for _, rack := range node.Children() { possibleDataNodesCount := 0 for _, n := range rack.Children() { - if n.FreeSpace() >= 1 { + if n.AvailableSpaceFor(option) >= 1 { possibleDataNodesCount++ } } @@ -145,12 +147,12 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum } //find main rack and other racks - mainRack, otherRacks, rackErr := mainDataCenter.(*DataCenter).PickNodesByWeight(rp.DiffRackCount+1, func(node Node) error { + mainRack, otherRacks, rackErr := mainDataCenter.(*DataCenter).PickNodesByWeight(rp.DiffRackCount+1, option, func(node Node) error { if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) { return fmt.Errorf("Not matching preferred rack:%s", option.Rack) } - if node.FreeSpace() < int64(rp.SameRackCount+1) { - return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1) + if node.AvailableSpaceFor(option) < int64(rp.SameRackCount+1) { + return fmt.Errorf("Free:%d < Expected:%d", node.AvailableSpaceFor(option), rp.SameRackCount+1) } if len(node.Children()) < rp.SameRackCount+1 { // a bit faster way to test free racks @@ -158,7 +160,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum } possibleDataNodesCount := 0 for _, n := range node.Children() { - if n.FreeSpace() >= 1 { + if n.AvailableSpaceFor(option) >= 1 { possibleDataNodesCount++ } } @@ -172,12 +174,12 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum } //find main rack and other racks - mainServer, otherServers, serverErr := mainRack.(*Rack).PickNodesByWeight(rp.SameRackCount+1, func(node Node) error { + mainServer, otherServers, serverErr := mainRack.(*Rack).PickNodesByWeight(rp.SameRackCount+1, option, func(node Node) error { if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) { return fmt.Errorf("Not matching preferred data node:%s", option.DataNode) } - if node.FreeSpace() < 1 { - return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1) + if node.AvailableSpaceFor(option) < 1 { + return fmt.Errorf("Free:%d < Expected:%d", node.AvailableSpaceFor(option), 1) } return nil }) @@ -190,16 +192,16 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum servers = append(servers, server.(*DataNode)) } for _, rack := range otherRacks { - r := rand.Int63n(rack.FreeSpace()) - if server, e := rack.ReserveOneVolume(r); e == nil { + r := rand.Int63n(rack.AvailableSpaceFor(option)) + if server, e := rack.ReserveOneVolume(r, option); e == nil { servers = append(servers, server) } else { return servers, e } } for _, datacenter := range otherDataCenters { - r := rand.Int63n(datacenter.FreeSpace()) - if server, e := datacenter.ReserveOneVolume(r); e == nil { + r := rand.Int63n(datacenter.AvailableSpaceFor(option)) + if server, e := datacenter.ReserveOneVolume(r, option); e == nil { servers = append(servers, server) } else { return servers, e @@ -218,6 +220,7 @@ func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid ReplicaPlacement: option.ReplicaPlacement, Ttl: option.Ttl, Version: needle.CurrentVersion, + DiskType: string(option.DiskType), } server.AddOrUpdateVolume(vi) topo.RegisterVolumeLayout(vi, server) diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go index bc9083fd2..ab30cd43f 100644 --- a/weed/topology/volume_growth_test.go +++ b/weed/topology/volume_growth_test.go @@ -103,7 +103,13 @@ func setup(topologyLayout string) *Topology { Version: needle.CurrentVersion} server.AddOrUpdateVolume(vi) } - server.UpAdjustMaxVolumeCountDelta(int64(serverMap["limit"].(float64))) + + disk := server.getOrCreateDisk("") + deltaDiskUsages := newDiskUsages() + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk("") + deltaDiskUsage.maxVolumeCount = int64(serverMap["limit"].(float64)) + disk.UpAdjustDiskUsageDelta(deltaDiskUsages) + } } } diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index e2d6c170c..5784c894b 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -3,6 +3,7 @@ package topology import ( "errors" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "math/rand" "sync" "time" @@ -103,6 +104,7 @@ func (v *volumesBinaryState) copyState(list *VolumeLocationList) copyState { type VolumeLayout struct { rp *super_block.ReplicaPlacement ttl *needle.TTL + diskType types.DiskType vid2location map[needle.VolumeId]*VolumeLocationList writables []needle.VolumeId // transient array of writable volume id readonlyVolumes *volumesBinaryState // readonly volumes @@ -118,10 +120,11 @@ type VolumeLayoutStats struct { FileCount uint64 } -func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64, replicationAsMin bool) *VolumeLayout { +func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType, volumeSizeLimit uint64, replicationAsMin bool) *VolumeLayout { return &VolumeLayout{ rp: rp, ttl: ttl, + diskType: diskType, vid2location: make(map[needle.VolumeId]*VolumeLocationList), writables: *new([]needle.VolumeId), readonlyVolumes: NewVolumesBinaryState(readOnlyState, rp, ExistCopies()), |
