diff options
Diffstat (limited to 'weed/topology')
| -rw-r--r-- | weed/topology/allocate_volume.go | 1 | ||||
| -rw-r--r-- | weed/topology/collection.go | 28 | ||||
| -rw-r--r-- | weed/topology/data_center.go | 15 | ||||
| -rw-r--r-- | weed/topology/data_node.go | 232 | ||||
| -rw-r--r-- | weed/topology/data_node_ec.go | 129 | ||||
| -rw-r--r-- | weed/topology/disk.go | 270 | ||||
| -rw-r--r-- | weed/topology/disk_ec.go | 84 | ||||
| -rw-r--r-- | weed/topology/node.go | 205 | ||||
| -rw-r--r-- | weed/topology/rack.go | 20 | ||||
| -rw-r--r-- | weed/topology/store_replicate.go | 84 | ||||
| -rw-r--r-- | weed/topology/topology.go | 79 | ||||
| -rw-r--r-- | weed/topology/topology_ec.go | 3 | ||||
| -rw-r--r-- | weed/topology/topology_event_handling.go | 23 | ||||
| -rw-r--r-- | weed/topology/topology_map.go | 16 | ||||
| -rw-r--r-- | weed/topology/topology_test.go | 68 | ||||
| -rw-r--r-- | weed/topology/topology_vacuum.go | 62 | ||||
| -rw-r--r-- | weed/topology/volume_growth.go | 68 | ||||
| -rw-r--r-- | weed/topology/volume_growth_test.go | 219 | ||||
| -rw-r--r-- | weed/topology/volume_layout.go | 181 | ||||
| -rw-r--r-- | weed/topology/volume_layout_test.go | 116 | ||||
| -rw-r--r-- | weed/topology/volume_location_list.go | 13 |
21 files changed, 1455 insertions, 461 deletions
diff --git a/weed/topology/allocate_volume.go b/weed/topology/allocate_volume.go index e5dc48652..39c24ab04 100644 --- a/weed/topology/allocate_volume.go +++ b/weed/topology/allocate_volume.go @@ -24,6 +24,7 @@ func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.Vol Ttl: option.Ttl.String(), Preallocate: option.Prealloacte, MemoryMapMaxSizeMb: option.MemoryMapMaxSizeMb, + DiskType: string(option.DiskType), }) return deleteErr }) diff --git a/weed/topology/collection.go b/weed/topology/collection.go index 7a611d904..a14b68851 100644 --- a/weed/topology/collection.go +++ b/weed/topology/collection.go @@ -2,6 +2,7 @@ package topology import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/super_block" @@ -11,11 +12,16 @@ import ( type Collection struct { Name string volumeSizeLimit uint64 + replicationAsMin bool storageType2VolumeLayout *util.ConcurrentReadMap } -func NewCollection(name string, volumeSizeLimit uint64) *Collection { - c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit} +func NewCollection(name string, volumeSizeLimit uint64, replicationAsMin bool) *Collection { + c := &Collection{ + Name: name, + volumeSizeLimit: volumeSizeLimit, + replicationAsMin: replicationAsMin, + } c.storageType2VolumeLayout = util.NewConcurrentReadMap() return c } @@ -24,17 +30,31 @@ func (c *Collection) String() string { return fmt.Sprintf("Name:%s, volumeSizeLimit:%d, storageType2VolumeLayout:%v", c.Name, c.volumeSizeLimit, c.storageType2VolumeLayout) } -func (c *Collection) GetOrCreateVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout { +func (c *Collection) GetOrCreateVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) *VolumeLayout { keyString := rp.String() if ttl != nil { keyString += ttl.String() } + if diskType != types.HardDriveType { + keyString += string(diskType) + } vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} { - return NewVolumeLayout(rp, ttl, c.volumeSizeLimit) + return NewVolumeLayout(rp, ttl, diskType, c.volumeSizeLimit, c.replicationAsMin) }) return vl.(*VolumeLayout) } +func (c *Collection) DeleteVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) { + keyString := rp.String() + if ttl != nil { + keyString += ttl.String() + } + if diskType != types.HardDriveType { + keyString += string(diskType) + } + c.storageType2VolumeLayout.Delete(keyString) +} + func (c *Collection) Lookup(vid needle.VolumeId) []*DataNode { for _, vl := range c.storageType2VolumeLayout.Items() { if vl != nil { diff --git a/weed/topology/data_center.go b/weed/topology/data_center.go index dc3accb71..60d91ba6d 100644 --- a/weed/topology/data_center.go +++ b/weed/topology/data_center.go @@ -1,6 +1,8 @@ package topology -import "github.com/chrislusf/seaweedfs/weed/pb/master_pb" +import ( + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" +) type DataCenter struct { NodeImpl @@ -10,6 +12,7 @@ func NewDataCenter(id string) *DataCenter { dc := &DataCenter{} dc.id = NodeId(id) dc.nodeType = "DataCenter" + dc.diskUsages = newDiskUsages() dc.children = make(map[NodeId]Node) dc.NodeImpl.value = dc return dc @@ -30,8 +33,6 @@ func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack { func (dc *DataCenter) ToMap() interface{} { m := make(map[string]interface{}) m["Id"] = dc.Id() - m["Max"] = dc.GetMaxVolumeCount() - m["Free"] = dc.FreeSpace() var racks []interface{} for _, c := range dc.Children() { rack := c.(*Rack) @@ -43,12 +44,8 @@ func (dc *DataCenter) ToMap() interface{} { func (dc *DataCenter) ToDataCenterInfo() *master_pb.DataCenterInfo { m := &master_pb.DataCenterInfo{ - Id: string(dc.Id()), - VolumeCount: uint64(dc.GetVolumeCount()), - MaxVolumeCount: uint64(dc.GetMaxVolumeCount()), - FreeVolumeCount: uint64(dc.FreeSpace()), - ActiveVolumeCount: uint64(dc.GetActiveVolumeCount()), - RemoteVolumeCount: uint64(dc.GetRemoteVolumeCount()), + Id: string(dc.Id()), + DiskInfos: dc.diskUsages.ToDiskInfo(), } for _, c := range dc.Children() { rack := c.(*Rack) diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go index 617341e54..69f739dd5 100644 --- a/weed/topology/data_node.go +++ b/weed/topology/data_node.go @@ -2,12 +2,11 @@ package topology import ( "fmt" - "strconv" - "sync" - "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" + "strconv" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" @@ -15,122 +14,161 @@ import ( type DataNode struct { NodeImpl - volumes map[needle.VolumeId]storage.VolumeInfo - Ip string - Port int - PublicUrl string - LastSeen int64 // unix time in seconds - ecShards map[needle.VolumeId]*erasure_coding.EcVolumeInfo - ecShardsLock sync.RWMutex + Ip string + Port int + PublicUrl string + LastSeen int64 // unix time in seconds } func NewDataNode(id string) *DataNode { - s := &DataNode{} - s.id = NodeId(id) - s.nodeType = "DataNode" - s.volumes = make(map[needle.VolumeId]storage.VolumeInfo) - s.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo) - s.NodeImpl.value = s - return s + dn := &DataNode{} + dn.id = NodeId(id) + dn.nodeType = "DataNode" + dn.diskUsages = newDiskUsages() + dn.children = make(map[NodeId]Node) + dn.NodeImpl.value = dn + return dn } func (dn *DataNode) String() string { dn.RLock() defer dn.RUnlock() - return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl) + return fmt.Sprintf("Node:%s, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.Ip, dn.Port, dn.PublicUrl) } -func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew bool) { +func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) { dn.Lock() defer dn.Unlock() - if oldV, ok := dn.volumes[v.Id]; !ok { - dn.volumes[v.Id] = v - dn.UpAdjustVolumeCountDelta(1) - if v.IsRemote() { - dn.UpAdjustRemoteVolumeCountDelta(1) - } - if !v.ReadOnly { - dn.UpAdjustActiveVolumeCountDelta(1) - } - dn.UpAdjustMaxVolumeId(v.Id) - isNew = true - } else { - if oldV.IsRemote() != v.IsRemote() { - if v.IsRemote() { - dn.UpAdjustRemoteVolumeCountDelta(1) - } - if oldV.IsRemote() { - dn.UpAdjustRemoteVolumeCountDelta(-1) - } - } - dn.volumes[v.Id] = v + return dn.doAddOrUpdateVolume(v) +} + +func (dn *DataNode) getOrCreateDisk(diskType string) *Disk { + c, found := dn.children[NodeId(diskType)] + if !found { + c = NewDisk(diskType) + dn.doLinkChildNode(c) } - return + disk := c.(*Disk) + return disk +} + +func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) { + disk := dn.getOrCreateDisk(v.DiskType) + return disk.AddOrUpdateVolume(v) } -func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes []storage.VolumeInfo) { +// UpdateVolumes detects new/deleted/changed volumes on a volume server +// used in master to notify master clients of these changes. +func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changeRO []storage.VolumeInfo) { + actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo) for _, v := range actualVolumes { actualVolumeMap[v.Id] = v } + dn.Lock() - for vid, v := range dn.volumes { + defer dn.Unlock() + + existingVolumes := dn.getVolumes() + + for _, v := range existingVolumes { + vid := v.Id if _, ok := actualVolumeMap[vid]; !ok { glog.V(0).Infoln("Deleting volume id:", vid) - delete(dn.volumes, vid) + disk := dn.getOrCreateDisk(v.DiskType) + delete(disk.volumes, vid) deletedVolumes = append(deletedVolumes, v) - dn.UpAdjustVolumeCountDelta(-1) + + deltaDiskUsages := newDiskUsages() + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType)) + deltaDiskUsage.volumeCount = -1 if v.IsRemote() { - dn.UpAdjustRemoteVolumeCountDelta(-1) + deltaDiskUsage.remoteVolumeCount = -1 } if !v.ReadOnly { - dn.UpAdjustActiveVolumeCountDelta(-1) + deltaDiskUsage.activeVolumeCount = -1 } + disk.UpAdjustDiskUsageDelta(deltaDiskUsages) } } - dn.Unlock() for _, v := range actualVolumes { - isNew := dn.AddOrUpdateVolume(v) + isNew, isChangedRO := dn.doAddOrUpdateVolume(v) if isNew { newVolumes = append(newVolumes, v) } + if isChangedRO { + changeRO = append(changeRO, v) + } } return } -func (dn *DataNode) DeltaUpdateVolumes(newlVolumes, deletedVolumes []storage.VolumeInfo) { +func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.VolumeInfo) { dn.Lock() + defer dn.Unlock() + for _, v := range deletedVolumes { - delete(dn.volumes, v.Id) - dn.UpAdjustVolumeCountDelta(-1) + disk := dn.getOrCreateDisk(v.DiskType) + delete(disk.volumes, v.Id) + + deltaDiskUsages := newDiskUsages() + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType)) + deltaDiskUsage.volumeCount = -1 if v.IsRemote() { - dn.UpAdjustRemoteVolumeCountDelta(-1) + deltaDiskUsage.remoteVolumeCount = -1 } if !v.ReadOnly { - dn.UpAdjustActiveVolumeCountDelta(-1) + deltaDiskUsage.activeVolumeCount = -1 } + disk.UpAdjustDiskUsageDelta(deltaDiskUsages) } - dn.Unlock() - for _, v := range newlVolumes { - dn.AddOrUpdateVolume(v) + for _, v := range newVolumes { + dn.doAddOrUpdateVolume(v) } return } +func (dn *DataNode) AdjustMaxVolumeCounts(maxVolumeCounts map[string]uint32) { + deltaDiskUsages := newDiskUsages() + for diskType, maxVolumeCount := range maxVolumeCounts { + if maxVolumeCount == 0 { + // the volume server may have set the max to zero + continue + } + dt := types.ToDiskType(diskType) + currentDiskUsage := dn.diskUsages.getOrCreateDisk(dt) + if currentDiskUsage.maxVolumeCount == int64(maxVolumeCount) { + continue + } + disk := dn.getOrCreateDisk(dt.String()) + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(dt) + deltaDiskUsage.maxVolumeCount = int64(maxVolumeCount) - currentDiskUsage.maxVolumeCount + disk.UpAdjustDiskUsageDelta(deltaDiskUsages) + } +} + func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) { dn.RLock() - for _, v := range dn.volumes { - ret = append(ret, v) + for _, c := range dn.children { + disk := c.(*Disk) + ret = append(ret, disk.GetVolumes()...) } dn.RUnlock() return ret } -func (dn *DataNode) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, error) { +func (dn *DataNode) GetVolumesById(id needle.VolumeId) (vInfo storage.VolumeInfo, err error) { dn.RLock() defer dn.RUnlock() - vInfo, ok := dn.volumes[id] - if ok { + found := false + for _, c := range dn.children { + disk := c.(*Disk) + vInfo, found = disk.volumes[id] + if found { + break + } + } + if found { return vInfo, nil } else { return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found") @@ -138,7 +176,10 @@ func (dn *DataNode) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, erro } func (dn *DataNode) GetDataCenter() *DataCenter { - return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter) + rack := dn.Parent() + dcNode := rack.Parent() + dcValue := dcNode.GetValue() + return dcValue.(*DataCenter) } func (dn *DataNode) GetRack() *Rack { @@ -165,28 +206,61 @@ func (dn *DataNode) Url() string { func (dn *DataNode) ToMap() interface{} { ret := make(map[string]interface{}) ret["Url"] = dn.Url() - ret["Volumes"] = dn.GetVolumeCount() - ret["EcShards"] = dn.GetEcShardCount() - ret["Max"] = dn.GetMaxVolumeCount() - ret["Free"] = dn.FreeSpace() ret["PublicUrl"] = dn.PublicUrl + + // aggregated volume info + var volumeCount, ecShardCount, maxVolumeCount int64 + var volumeIds string + for _, diskUsage := range dn.diskUsages.usages { + volumeCount += diskUsage.volumeCount + ecShardCount += diskUsage.ecShardCount + maxVolumeCount += diskUsage.maxVolumeCount + } + + for _, disk := range dn.Children() { + d := disk.(*Disk) + volumeIds += " " + d.GetVolumeIds() + } + + ret["Volumes"] = volumeCount + ret["EcShards"] = ecShardCount + ret["Max"] = maxVolumeCount + ret["VolumeIds"] = volumeIds + return ret } func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo { m := &master_pb.DataNodeInfo{ - Id: string(dn.Id()), - VolumeCount: uint64(dn.GetVolumeCount()), - MaxVolumeCount: uint64(dn.GetMaxVolumeCount()), - FreeVolumeCount: uint64(dn.FreeSpace()), - ActiveVolumeCount: uint64(dn.GetActiveVolumeCount()), - RemoteVolumeCount: uint64(dn.GetRemoteVolumeCount()), - } - for _, v := range dn.GetVolumes() { - m.VolumeInfos = append(m.VolumeInfos, v.ToVolumeInformationMessage()) + Id: string(dn.Id()), + DiskInfos: make(map[string]*master_pb.DiskInfo), } - for _, ecv := range dn.GetEcShards() { - m.EcShardInfos = append(m.EcShardInfos, ecv.ToVolumeEcShardInformationMessage()) + for _, c := range dn.Children() { + disk := c.(*Disk) + m.DiskInfos[string(disk.Id())] = disk.ToDiskInfo() } return m } + +// GetVolumeIds returns the human readable volume ids limited to count of max 100. +func (dn *DataNode) GetVolumeIds() string { + dn.RLock() + defer dn.RUnlock() + existingVolumes := dn.getVolumes() + ids := make([]int, 0, len(existingVolumes)) + + for k := range existingVolumes { + ids = append(ids, int(k)) + } + + return util.HumanReadableIntsMax(100, ids...) +} + +func (dn *DataNode) getVolumes() []storage.VolumeInfo { + var existingVolumes []storage.VolumeInfo + for _, c := range dn.children { + disk := c.(*Disk) + existingVolumes = append(existingVolumes, disk.GetVolumes()...) + } + return existingVolumes +} diff --git a/weed/topology/data_node_ec.go b/weed/topology/data_node_ec.go index 75c8784fe..330b16b24 100644 --- a/weed/topology/data_node_ec.go +++ b/weed/topology/data_node_ec.go @@ -3,12 +3,14 @@ package topology import ( "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" ) func (dn *DataNode) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) { dn.RLock() - for _, ecVolumeInfo := range dn.ecShards { - ret = append(ret, ecVolumeInfo) + for _, c := range dn.children { + disk := c.(*Disk) + ret = append(ret, disk.GetEcShards()...) } dn.RUnlock() return ret @@ -21,10 +23,17 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) actualEcShardMap[ecShards.VolumeId] = ecShards } - // found out the newShards and deletedShards + existingEcShards := dn.GetEcShards() + + // find out the newShards and deletedShards var newShardCount, deletedShardCount int - dn.ecShardsLock.RLock() - for vid, ecShards := range dn.ecShards { + for _, ecShards := range existingEcShards { + + disk := dn.getOrCreateDisk(ecShards.DiskType) + deltaDiskUsages := newDiskUsages() + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType)) + + vid := ecShards.VolumeId if actualEcShards, ok := actualEcShardMap[vid]; !ok { // dn registered ec shards not found in the new set of ec shards deletedShards = append(deletedShards, ecShards) @@ -42,26 +51,60 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) deletedShardCount += d.ShardIdCount() } } + + deltaDiskUsage.ecShardCount = int64(newShardCount - deletedShardCount) + disk.UpAdjustDiskUsageDelta(deltaDiskUsages) + } + for _, ecShards := range actualShards { - if _, found := dn.ecShards[ecShards.VolumeId]; !found { - newShards = append(newShards, ecShards) - newShardCount += ecShards.ShardIdCount() + if dn.hasEcShards(ecShards.VolumeId) { + continue } + + newShards = append(newShards, ecShards) + + disk := dn.getOrCreateDisk(ecShards.DiskType) + deltaDiskUsages := newDiskUsages() + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType)) + deltaDiskUsage.ecShardCount = int64(ecShards.ShardIdCount()) + disk.UpAdjustDiskUsageDelta(deltaDiskUsages) } - dn.ecShardsLock.RUnlock() if len(newShards) > 0 || len(deletedShards) > 0 { // if changed, set to the new ec shard map - dn.ecShardsLock.Lock() - dn.ecShards = actualEcShardMap - dn.UpAdjustEcShardCountDelta(int64(newShardCount - deletedShardCount)) - dn.ecShardsLock.Unlock() + dn.doUpdateEcShards(actualShards) } return } +func (dn *DataNode) hasEcShards(volumeId needle.VolumeId) (found bool) { + dn.RLock() + defer dn.RUnlock() + for _, c := range dn.children { + disk := c.(*Disk) + _, found = disk.ecShards[volumeId] + if found { + return + } + } + return +} + +func (dn *DataNode) doUpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) { + dn.Lock() + for _, c := range dn.children { + disk := c.(*Disk) + disk.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo) + } + for _, shard := range actualShards { + disk := dn.getOrCreateDisk(shard.DiskType) + disk.ecShards[shard.VolumeId] = shard + } + dn.Unlock() +} + func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_coding.EcVolumeInfo) { for _, newShard := range newShards { @@ -75,61 +118,25 @@ func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_codi } func (dn *DataNode) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) { - dn.ecShardsLock.Lock() - defer dn.ecShardsLock.Unlock() - - delta := 0 - if existing, ok := dn.ecShards[s.VolumeId]; !ok { - dn.ecShards[s.VolumeId] = s - delta = s.ShardBits.ShardIdCount() - } else { - oldCount := existing.ShardBits.ShardIdCount() - existing.ShardBits = existing.ShardBits.Plus(s.ShardBits) - delta = existing.ShardBits.ShardIdCount() - oldCount - } - - dn.UpAdjustEcShardCountDelta(int64(delta)) - + disk := dn.getOrCreateDisk(s.DiskType) + disk.AddOrUpdateEcShard(s) } func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) { - dn.ecShardsLock.Lock() - defer dn.ecShardsLock.Unlock() - - if existing, ok := dn.ecShards[s.VolumeId]; ok { - oldCount := existing.ShardBits.ShardIdCount() - existing.ShardBits = existing.ShardBits.Minus(s.ShardBits) - delta := existing.ShardBits.ShardIdCount() - oldCount - dn.UpAdjustEcShardCountDelta(int64(delta)) - if existing.ShardBits.ShardIdCount() == 0 { - delete(dn.ecShards, s.VolumeId) - } - } - + disk := dn.getOrCreateDisk(s.DiskType) + disk.DeleteEcShard(s) } -func (dn *DataNode) HasVolumesById(id needle.VolumeId) (hasVolumeId bool) { +func (dn *DataNode) HasVolumesById(volumeId needle.VolumeId) (hasVolumeId bool) { - // check whether normal volumes has this volume id dn.RLock() - _, ok := dn.volumes[id] - if ok { - hasVolumeId = true - } - dn.RUnlock() - - if hasVolumeId { - return - } - - // check whether ec shards has this volume id - dn.ecShardsLock.RLock() - _, ok = dn.ecShards[id] - if ok { - hasVolumeId = true + defer dn.RUnlock() + for _, c := range dn.children { + disk := c.(*Disk) + if disk.HasVolumesById(volumeId) { + return true + } } - dn.ecShardsLock.RUnlock() - - return + return false } diff --git a/weed/topology/disk.go b/weed/topology/disk.go new file mode 100644 index 000000000..a085f8dff --- /dev/null +++ b/weed/topology/disk.go @@ -0,0 +1,270 @@ +package topology + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" + "sync" + + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + + "github.com/chrislusf/seaweedfs/weed/storage" +) + +type Disk struct { + NodeImpl + volumes map[needle.VolumeId]storage.VolumeInfo + ecShards map[needle.VolumeId]*erasure_coding.EcVolumeInfo + ecShardsLock sync.RWMutex +} + +func NewDisk(diskType string) *Disk { + s := &Disk{} + s.id = NodeId(diskType) + s.nodeType = "Disk" + s.diskUsages = newDiskUsages() + s.volumes = make(map[needle.VolumeId]storage.VolumeInfo, 2) + s.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo, 2) + s.NodeImpl.value = s + return s +} + +type DiskUsages struct { + sync.RWMutex + usages map[types.DiskType]*DiskUsageCounts +} + +func newDiskUsages() *DiskUsages { + return &DiskUsages{ + usages: make(map[types.DiskType]*DiskUsageCounts), + } +} + +func (d *DiskUsages) negative() *DiskUsages { + d.RLock() + defer d.RUnlock() + t := newDiskUsages() + for diskType, b := range d.usages { + a := t.getOrCreateDisk(diskType) + a.volumeCount = -b.volumeCount + a.remoteVolumeCount = -b.remoteVolumeCount + a.activeVolumeCount = -b.activeVolumeCount + a.ecShardCount = -b.ecShardCount + a.maxVolumeCount = -b.maxVolumeCount + + } + return t +} + +func (d *DiskUsages) ToDiskInfo() map[string]*master_pb.DiskInfo { + ret := make(map[string]*master_pb.DiskInfo) + for diskType, diskUsageCounts := range d.usages { + m := &master_pb.DiskInfo{ + VolumeCount: uint64(diskUsageCounts.volumeCount), + MaxVolumeCount: uint64(diskUsageCounts.maxVolumeCount), + FreeVolumeCount: uint64(diskUsageCounts.maxVolumeCount - diskUsageCounts.volumeCount), + ActiveVolumeCount: uint64(diskUsageCounts.activeVolumeCount), + RemoteVolumeCount: uint64(diskUsageCounts.remoteVolumeCount), + } + ret[string(diskType)] = m + } + return ret +} + +func (d *DiskUsages) FreeSpace() (freeSpace int64) { + d.RLock() + defer d.RUnlock() + for _, diskUsage := range d.usages { + freeSpace += diskUsage.FreeSpace() + } + return +} + +func (d *DiskUsages) GetMaxVolumeCount() (maxVolumeCount int64) { + d.RLock() + defer d.RUnlock() + for _, diskUsage := range d.usages { + maxVolumeCount += diskUsage.maxVolumeCount + } + return +} + +type DiskUsageCounts struct { + volumeCount int64 + remoteVolumeCount int64 + activeVolumeCount int64 + ecShardCount int64 + maxVolumeCount int64 +} + +func (a *DiskUsageCounts) addDiskUsageCounts(b *DiskUsageCounts) { + a.volumeCount += b.volumeCount + a.remoteVolumeCount += b.remoteVolumeCount + a.activeVolumeCount += b.activeVolumeCount + a.ecShardCount += b.ecShardCount + a.maxVolumeCount += b.maxVolumeCount +} + +func (a *DiskUsageCounts) FreeSpace() int64 { + freeVolumeSlotCount := a.maxVolumeCount + a.remoteVolumeCount - a.volumeCount + if a.ecShardCount > 0 { + freeVolumeSlotCount = freeVolumeSlotCount - a.ecShardCount/erasure_coding.DataShardsCount - 1 + } + return freeVolumeSlotCount +} + +func (a *DiskUsageCounts) minus(b *DiskUsageCounts) *DiskUsageCounts { + return &DiskUsageCounts{ + volumeCount: a.volumeCount - b.volumeCount, + remoteVolumeCount: a.remoteVolumeCount - b.remoteVolumeCount, + activeVolumeCount: a.activeVolumeCount - b.activeVolumeCount, + ecShardCount: a.ecShardCount - b.ecShardCount, + maxVolumeCount: a.maxVolumeCount - b.maxVolumeCount, + } +} + +func (du *DiskUsages) getOrCreateDisk(diskType types.DiskType) *DiskUsageCounts { + du.Lock() + defer du.Unlock() + t, found := du.usages[diskType] + if found { + return t + } + t = &DiskUsageCounts{} + du.usages[diskType] = t + return t +} + +func (d *Disk) String() string { + d.RLock() + defer d.RUnlock() + return fmt.Sprintf("Disk:%s, volumes:%v, ecShards:%v", d.NodeImpl.String(), d.volumes, d.ecShards) +} + +func (d *Disk) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) { + d.Lock() + defer d.Unlock() + return d.doAddOrUpdateVolume(v) +} + +func (d *Disk) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) { + deltaDiskUsages := newDiskUsages() + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType)) + if oldV, ok := d.volumes[v.Id]; !ok { + d.volumes[v.Id] = v + deltaDiskUsage.volumeCount = 1 + if v.IsRemote() { + deltaDiskUsage.remoteVolumeCount = 1 + } + if !v.ReadOnly { + deltaDiskUsage.activeVolumeCount = 1 + } + d.UpAdjustMaxVolumeId(v.Id) + d.UpAdjustDiskUsageDelta(deltaDiskUsages) + isNew = true + } else { + if oldV.IsRemote() != v.IsRemote() { + if v.IsRemote() { + deltaDiskUsage.remoteVolumeCount = 1 + } + if oldV.IsRemote() { + deltaDiskUsage.remoteVolumeCount = -1 + } + d.UpAdjustDiskUsageDelta(deltaDiskUsages) + } + isChangedRO = d.volumes[v.Id].ReadOnly != v.ReadOnly + d.volumes[v.Id] = v + } + return +} + +func (d *Disk) GetVolumes() (ret []storage.VolumeInfo) { + d.RLock() + for _, v := range d.volumes { + ret = append(ret, v) + } + d.RUnlock() + return ret +} + +func (d *Disk) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, error) { + d.RLock() + defer d.RUnlock() + vInfo, ok := d.volumes[id] + if ok { + return vInfo, nil + } else { + return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found") + } +} + +func (d *Disk) GetDataCenter() *DataCenter { + dn := d.Parent() + rack := dn.Parent() + dcNode := rack.Parent() + dcValue := dcNode.GetValue() + return dcValue.(*DataCenter) +} + +func (d *Disk) GetRack() *Rack { + return d.Parent().Parent().(*NodeImpl).value.(*Rack) +} + +func (d *Disk) GetTopology() *Topology { + p := d.Parent() + for p.Parent() != nil { + p = p.Parent() + } + t := p.(*Topology) + return t +} + +func (d *Disk) ToMap() interface{} { + ret := make(map[string]interface{}) + diskUsage := d.diskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id()))) + ret["Volumes"] = diskUsage.volumeCount + ret["VolumeIds"] = d.GetVolumeIds() + ret["EcShards"] = diskUsage.ecShardCount + ret["Max"] = diskUsage.maxVolumeCount + ret["Free"] = d.FreeSpace() + return ret +} + +func (d *Disk) FreeSpace() int64 { + t := d.diskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id()))) + return t.FreeSpace() +} + +func (d *Disk) ToDiskInfo() *master_pb.DiskInfo { + diskUsage := d.diskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id()))) + m := &master_pb.DiskInfo{ + Type: string(d.Id()), + VolumeCount: uint64(diskUsage.volumeCount), + MaxVolumeCount: uint64(diskUsage.maxVolumeCount), + FreeVolumeCount: uint64(diskUsage.maxVolumeCount - diskUsage.volumeCount), + ActiveVolumeCount: uint64(diskUsage.activeVolumeCount), + RemoteVolumeCount: uint64(diskUsage.remoteVolumeCount), + } + for _, v := range d.GetVolumes() { + m.VolumeInfos = append(m.VolumeInfos, v.ToVolumeInformationMessage()) + } + for _, ecv := range d.GetEcShards() { + m.EcShardInfos = append(m.EcShardInfos, ecv.ToVolumeEcShardInformationMessage()) + } + return m +} + +// GetVolumeIds returns the human readable volume ids limited to count of max 100. +func (d *Disk) GetVolumeIds() string { + d.RLock() + defer d.RUnlock() + ids := make([]int, 0, len(d.volumes)) + + for k := range d.volumes { + ids = append(ids, int(k)) + } + + return util.HumanReadableIntsMax(100, ids...) +} diff --git a/weed/topology/disk_ec.go b/weed/topology/disk_ec.go new file mode 100644 index 000000000..74a06b47f --- /dev/null +++ b/weed/topology/disk_ec.go @@ -0,0 +1,84 @@ +package topology + +import ( + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +func (d *Disk) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) { + d.RLock() + for _, ecVolumeInfo := range d.ecShards { + ret = append(ret, ecVolumeInfo) + } + d.RUnlock() + return ret +} + +func (d *Disk) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) { + d.ecShardsLock.Lock() + defer d.ecShardsLock.Unlock() + + delta := 0 + if existing, ok := d.ecShards[s.VolumeId]; !ok { + d.ecShards[s.VolumeId] = s + delta = s.ShardBits.ShardIdCount() + } else { + oldCount := existing.ShardBits.ShardIdCount() + existing.ShardBits = existing.ShardBits.Plus(s.ShardBits) + delta = existing.ShardBits.ShardIdCount() - oldCount + } + + deltaDiskUsages := newDiskUsages() + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id()))) + deltaDiskUsage.ecShardCount = int64(delta) + d.UpAdjustDiskUsageDelta(deltaDiskUsages) + +} + +func (d *Disk) DeleteEcShard(s *erasure_coding.EcVolumeInfo) { + d.ecShardsLock.Lock() + defer d.ecShardsLock.Unlock() + + if existing, ok := d.ecShards[s.VolumeId]; ok { + oldCount := existing.ShardBits.ShardIdCount() + existing.ShardBits = existing.ShardBits.Minus(s.ShardBits) + delta := existing.ShardBits.ShardIdCount() - oldCount + + deltaDiskUsages := newDiskUsages() + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id()))) + deltaDiskUsage.ecShardCount = int64(delta) + d.UpAdjustDiskUsageDelta(deltaDiskUsages) + + if existing.ShardBits.ShardIdCount() == 0 { + delete(d.ecShards, s.VolumeId) + } + } + +} + +func (d *Disk) HasVolumesById(id needle.VolumeId) (hasVolumeId bool) { + + // check whether normal volumes has this volume id + d.RLock() + _, ok := d.volumes[id] + if ok { + hasVolumeId = true + } + d.RUnlock() + + if hasVolumeId { + return + } + + // check whether ec shards has this volume id + d.ecShardsLock.RLock() + _, ok = d.ecShards[id] + if ok { + hasVolumeId = true + } + d.ecShardsLock.RUnlock() + + return + +} diff --git a/weed/topology/node.go b/weed/topology/node.go index 572a89d4d..95d63972e 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -2,34 +2,25 @@ package topology import ( "errors" - "math/rand" - "strings" - "sync" - "sync/atomic" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "math/rand" + "strings" + "sync" ) type NodeId string type Node interface { Id() NodeId String() string - FreeSpace() int64 - ReserveOneVolume(r int64) (*DataNode, error) - UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) - UpAdjustVolumeCountDelta(volumeCountDelta int64) - UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) - UpAdjustEcShardCountDelta(ecShardCountDelta int64) - UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) + AvailableSpaceFor(option *VolumeGrowOption) int64 + ReserveOneVolume(r int64, option *VolumeGrowOption) (*DataNode, error) + UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages) UpAdjustMaxVolumeId(vid needle.VolumeId) + GetDiskUsages() *DiskUsages - GetVolumeCount() int64 - GetEcShardCount() int64 - GetActiveVolumeCount() int64 - GetRemoteVolumeCount() int64 - GetMaxVolumeCount() int64 GetMaxVolumeId() needle.VolumeId SetParent(Node) LinkChildNode(node Node) @@ -45,73 +36,81 @@ type Node interface { GetValue() interface{} //get reference to the topology,dc,rack,datanode } type NodeImpl struct { - volumeCount int64 - remoteVolumeCount int64 - activeVolumeCount int64 - ecShardCount int64 - maxVolumeCount int64 - id NodeId - parent Node - sync.RWMutex // lock children - children map[NodeId]Node - maxVolumeId needle.VolumeId + diskUsages *DiskUsages + id NodeId + parent Node + sync.RWMutex // lock children + children map[NodeId]Node + maxVolumeId needle.VolumeId //for rack, data center, topology nodeType string value interface{} } +func (n *NodeImpl) GetDiskUsages() *DiskUsages { + return n.diskUsages +} + // the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot -func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) { - candidates := make([]Node, 0, len(n.children)) +func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, option *VolumeGrowOption, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) { + var totalWeights int64 var errs []string n.RLock() + candidates := make([]Node, 0, len(n.children)) + candidatesWeights := make([]int64, 0, len(n.children)) + //pick nodes which has enough free volumes as candidates, and use free volumes number as node weight. for _, node := range n.children { - if err := filterFirstNodeFn(node); err == nil { - candidates = append(candidates, node) - } else { - errs = append(errs, string(node.Id())+":"+err.Error()) + if node.AvailableSpaceFor(option) <= 0 { + continue } + totalWeights += node.AvailableSpaceFor(option) + candidates = append(candidates, node) + candidatesWeights = append(candidatesWeights, node.AvailableSpaceFor(option)) } n.RUnlock() - if len(candidates) == 0 { - return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n")) + if len(candidates) < numberOfNodes { + glog.V(0).Infoln(n.Id(), "failed to pick", numberOfNodes, "from ", len(candidates), "node candidates") + return nil, nil, errors.New("No enough data node found!") } - firstNode = candidates[rand.Intn(len(candidates))] - glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id()) - restNodes = make([]Node, numberOfNodes-1) - candidates = candidates[:0] - n.RLock() - for _, node := range n.children { - if node.Id() == firstNode.Id() { - continue - } - if node.FreeSpace() <= 0 { - continue + //pick nodes randomly by weights, the node picked earlier has higher final weights + sortedCandidates := make([]Node, 0, len(candidates)) + for i := 0; i < len(candidates); i++ { + weightsInterval := rand.Int63n(totalWeights) + lastWeights := int64(0) + for k, weights := range candidatesWeights { + if (weightsInterval >= lastWeights) && (weightsInterval < lastWeights+weights) { + sortedCandidates = append(sortedCandidates, candidates[k]) + candidatesWeights[k] = 0 + totalWeights -= weights + break + } + lastWeights += weights } - glog.V(2).Infoln("select rest node candidate:", node.Id()) - candidates = append(candidates, node) } - n.RUnlock() - glog.V(2).Infoln(n.Id(), "picking", numberOfNodes-1, "from rest", len(candidates), "node candidates") - ret := len(restNodes) == 0 - for k, node := range candidates { - if k < len(restNodes) { - restNodes[k] = node - if k == len(restNodes)-1 { - ret = true + + restNodes = make([]Node, 0, numberOfNodes-1) + ret := false + n.RLock() + for k, node := range sortedCandidates { + if err := filterFirstNodeFn(node); err == nil { + firstNode = node + if k >= numberOfNodes-1 { + restNodes = sortedCandidates[:numberOfNodes-1] + } else { + restNodes = append(restNodes, sortedCandidates[:k]...) + restNodes = append(restNodes, sortedCandidates[k+1:numberOfNodes]...) } + ret = true + break } else { - r := rand.Intn(k + 1) - if r < len(restNodes) { - restNodes[r] = node - } + errs = append(errs, string(node.Id())+":"+err.Error()) } } + n.RUnlock() if !ret { - glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates") - err = errors.New("No enough data node found!") + return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n")) } return } @@ -134,10 +133,14 @@ func (n *NodeImpl) String() string { func (n *NodeImpl) Id() NodeId { return n.id } -func (n *NodeImpl) FreeSpace() int64 { - freeVolumeSlotCount := n.maxVolumeCount + n.remoteVolumeCount - n.volumeCount - if n.ecShardCount > 0 { - freeVolumeSlotCount = freeVolumeSlotCount - n.ecShardCount/erasure_coding.DataShardsCount - 1 +func (n *NodeImpl) getOrCreateDisk(diskType types.DiskType) *DiskUsageCounts { + return n.diskUsages.getOrCreateDisk(diskType) +} +func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 { + t := n.getOrCreateDisk(option.DiskType) + freeVolumeSlotCount := t.maxVolumeCount + t.remoteVolumeCount - t.volumeCount + if t.ecShardCount > 0 { + freeVolumeSlotCount = freeVolumeSlotCount - t.ecShardCount/erasure_coding.DataShardsCount - 1 } return freeVolumeSlotCount } @@ -158,11 +161,11 @@ func (n *NodeImpl) Parent() Node { func (n *NodeImpl) GetValue() interface{} { return n.value } -func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) { +func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) { n.RLock() defer n.RUnlock() for _, node := range n.children { - freeSpace := node.FreeSpace() + freeSpace := node.AvailableSpaceFor(option) // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) if freeSpace <= 0 { continue @@ -170,11 +173,11 @@ func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) if r >= freeSpace { r -= freeSpace } else { - if node.IsDataNode() && node.FreeSpace() > 0 { + if node.IsDataNode() && node.AvailableSpaceFor(option) > 0 { // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) return node.(*DataNode), nil } - assignedNode, err = node.ReserveOneVolume(r) + assignedNode, err = node.ReserveOneVolume(r, option) if err == nil { return } @@ -183,34 +186,13 @@ func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) return nil, errors.New("No free volume slot found!") } -func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //can be negative - atomic.AddInt64(&n.maxVolumeCount, maxVolumeCountDelta) - if n.parent != nil { - n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta) - } -} -func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative - atomic.AddInt64(&n.volumeCount, volumeCountDelta) - if n.parent != nil { - n.parent.UpAdjustVolumeCountDelta(volumeCountDelta) +func (n *NodeImpl) UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages) { //can be negative + for diskType, diskUsage := range deltaDiskUsages.usages { + existingDisk := n.getOrCreateDisk(diskType) + existingDisk.addDiskUsageCounts(diskUsage) } -} -func (n *NodeImpl) UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) { //can be negative - atomic.AddInt64(&n.remoteVolumeCount, remoteVolumeCountDelta) if n.parent != nil { - n.parent.UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta) - } -} -func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative - atomic.AddInt64(&n.ecShardCount, ecShardCountDelta) - if n.parent != nil { - n.parent.UpAdjustEcShardCountDelta(ecShardCountDelta) - } -} -func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative - atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta) - if n.parent != nil { - n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta) + n.parent.UpAdjustDiskUsageDelta(deltaDiskUsages) } } func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative @@ -224,33 +206,18 @@ func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId { return n.maxVolumeId } -func (n *NodeImpl) GetVolumeCount() int64 { - return n.volumeCount -} -func (n *NodeImpl) GetEcShardCount() int64 { - return n.ecShardCount -} -func (n *NodeImpl) GetRemoteVolumeCount() int64 { - return n.remoteVolumeCount -} -func (n *NodeImpl) GetActiveVolumeCount() int64 { - return n.activeVolumeCount -} -func (n *NodeImpl) GetMaxVolumeCount() int64 { - return n.maxVolumeCount -} func (n *NodeImpl) LinkChildNode(node Node) { n.Lock() defer n.Unlock() + n.doLinkChildNode(node) +} + +func (n *NodeImpl) doLinkChildNode(node Node) { if n.children[node.Id()] == nil { n.children[node.Id()] = node - n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount()) + n.UpAdjustDiskUsageDelta(node.GetDiskUsages()) n.UpAdjustMaxVolumeId(node.GetMaxVolumeId()) - n.UpAdjustVolumeCountDelta(node.GetVolumeCount()) - n.UpAdjustRemoteVolumeCountDelta(node.GetRemoteVolumeCount()) - n.UpAdjustEcShardCountDelta(node.GetEcShardCount()) - n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount()) node.SetParent(n) glog.V(0).Infoln(n, "adds child", node.Id()) } @@ -263,11 +230,7 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) { if node != nil { node.SetParent(nil) delete(n.children, node.Id()) - n.UpAdjustVolumeCountDelta(-node.GetVolumeCount()) - n.UpAdjustRemoteVolumeCountDelta(-node.GetRemoteVolumeCount()) - n.UpAdjustEcShardCountDelta(-node.GetEcShardCount()) - n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount()) - n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount()) + n.UpAdjustDiskUsageDelta(node.GetDiskUsages().negative()) glog.V(0).Infoln(n, "removes", node.Id()) } } diff --git a/weed/topology/rack.go b/weed/topology/rack.go index 1921c0c05..8eb2a717c 100644 --- a/weed/topology/rack.go +++ b/weed/topology/rack.go @@ -2,6 +2,7 @@ package topology import ( "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/types" "strconv" "time" ) @@ -14,6 +15,7 @@ func NewRack(id string) *Rack { r := &Rack{} r.id = NodeId(id) r.nodeType = "Rack" + r.diskUsages = newDiskUsages() r.children = make(map[NodeId]Node) r.NodeImpl.value = r return r @@ -28,7 +30,7 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode { } return nil } -func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int64) *DataNode { +func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCounts map[string]uint32) *DataNode { for _, c := range r.Children() { dn := c.(*DataNode) if dn.MatchLocation(ip, port) { @@ -40,17 +42,19 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol dn.Ip = ip dn.Port = port dn.PublicUrl = publicUrl - dn.maxVolumeCount = maxVolumeCount dn.LastSeen = time.Now().Unix() r.LinkChildNode(dn) + for diskType, maxVolumeCount := range maxVolumeCounts { + disk := NewDisk(diskType) + disk.diskUsages.getOrCreateDisk(types.ToDiskType(diskType)).maxVolumeCount = int64(maxVolumeCount) + dn.LinkChildNode(disk) + } return dn } func (r *Rack) ToMap() interface{} { m := make(map[string]interface{}) m["Id"] = r.Id() - m["Max"] = r.GetMaxVolumeCount() - m["Free"] = r.FreeSpace() var dns []interface{} for _, c := range r.Children() { dn := c.(*DataNode) @@ -62,12 +66,8 @@ func (r *Rack) ToMap() interface{} { func (r *Rack) ToRackInfo() *master_pb.RackInfo { m := &master_pb.RackInfo{ - Id: string(r.Id()), - VolumeCount: uint64(r.GetVolumeCount()), - MaxVolumeCount: uint64(r.GetMaxVolumeCount()), - FreeVolumeCount: uint64(r.FreeSpace()), - ActiveVolumeCount: uint64(r.GetActiveVolumeCount()), - RemoteVolumeCount: uint64(r.GetRemoteVolumeCount()), + Id: string(r.Id()), + DiskInfos: r.diskUsages.ToDiskInfo(), } for _, c := range r.Children() { dn := c.(*DataNode) diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index b195b48ed..ea0a8c968 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -1,7 +1,6 @@ package topology import ( - "bytes" "encoding/json" "errors" "fmt" @@ -15,30 +14,39 @@ import ( "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" ) -func ReplicatedWrite(masterNode string, s *storage.Store, - volumeId needle.VolumeId, n *needle.Needle, - r *http.Request) (size uint32, isUnchanged bool, err error) { +func ReplicatedWrite(masterFn operation.GetMasterFn, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (isUnchanged bool, err error) { //check JWT jwt := security.GetJwt(r) + // check whether this is a replicated write request var remoteLocations []operation.Location if r.FormValue("type") != "replicate" { - remoteLocations, err = getWritableRemoteReplications(s, volumeId, masterNode) + // this is the initial request + remoteLocations, err = getWritableRemoteReplications(s, volumeId, masterFn) if err != nil { glog.V(0).Infoln(err) return } } - size, isUnchanged, err = s.WriteVolumeNeedle(volumeId, n) - if err != nil { - err = fmt.Errorf("failed to write to local disk: %v", err) - glog.V(0).Infoln(err) - return + // read fsync value + fsync := false + if r.FormValue("fsync") == "true" { + fsync = true + } + + if s.GetVolume(volumeId) != nil { + isUnchanged, err = s.WriteVolumeNeedle(volumeId, n, fsync) + if err != nil { + err = fmt.Errorf("failed to write to local disk: %v", err) + glog.V(0).Infoln(err) + return + } } if len(remoteLocations) > 0 { //send to other replica locations @@ -72,12 +80,11 @@ func ReplicatedWrite(masterNode string, s *storage.Store, } } - _, err := operation.Upload(u.String(), - string(n.Name), bytes.NewReader(n.Data), n.IsGzipped(), string(n.Mime), - pairMap, jwt) + // volume server do not know about encryption + // TODO optimize here to compress data only once + _, err := operation.UploadData(u.String(), string(n.Name), false, n.Data, n.IsCompressed(), string(n.Mime), pairMap, jwt) return err }); err != nil { - size = 0 err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err) glog.V(0).Infoln(err) } @@ -85,16 +92,16 @@ func ReplicatedWrite(masterNode string, s *storage.Store, return } -func ReplicatedDelete(masterNode string, store *storage.Store, +func ReplicatedDelete(masterFn operation.GetMasterFn, store *storage.Store, volumeId needle.VolumeId, n *needle.Needle, - r *http.Request) (size uint32, err error) { + r *http.Request) (size types.Size, err error) { //check JWT jwt := security.GetJwt(r) var remoteLocations []operation.Location if r.FormValue("type") != "replicate" { - remoteLocations, err = getWritableRemoteReplications(store, volumeId, masterNode) + remoteLocations, err = getWritableRemoteReplications(store, volumeId, masterFn) if err != nil { glog.V(0).Infoln(err) return @@ -154,25 +161,34 @@ func distributedOperation(locations []operation.Location, store *storage.Store, return ret.Error() } -func getWritableRemoteReplications(s *storage.Store, volumeId needle.VolumeId, masterNode string) ( +func getWritableRemoteReplications(s *storage.Store, volumeId needle.VolumeId, masterFn operation.GetMasterFn) ( remoteLocations []operation.Location, err error) { - copyCount := s.GetVolume(volumeId).ReplicaPlacement.GetCopyCount() - if copyCount > 1 { - if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil { - if len(lookupResult.Locations) < copyCount { - err = fmt.Errorf("replicating opetations [%d] is less than volume's replication copy count [%d]", - len(lookupResult.Locations), copyCount) - return - } - selfUrl := s.Ip + ":" + strconv.Itoa(s.Port) - for _, location := range lookupResult.Locations { - if location.Url != selfUrl { - remoteLocations = append(remoteLocations, location) - } + + v := s.GetVolume(volumeId) + if v != nil && v.ReplicaPlacement.GetCopyCount() == 1 { + return + } + + // not on local store, or has replications + lookupResult, lookupErr := operation.Lookup(masterFn, volumeId.String()) + if lookupErr == nil { + selfUrl := s.Ip + ":" + strconv.Itoa(s.Port) + for _, location := range lookupResult.Locations { + if location.Url != selfUrl { + remoteLocations = append(remoteLocations, location) } - } else { - err = fmt.Errorf("failed to lookup for %d: %v", volumeId, lookupErr) - return + } + } else { + err = fmt.Errorf("failed to lookup for %d: %v", volumeId, lookupErr) + return + } + + if v != nil { + // has one local and has remote replications + copyCount := v.ReplicaPlacement.GetCopyCount() + if len(lookupResult.Locations) < copyCount { + err = fmt.Errorf("replicating opetations [%d] is less than volume %d replication copy count [%d]", + len(lookupResult.Locations), volumeId, copyCount) } } diff --git a/weed/topology/topology.go b/weed/topology/topology.go index e6cb44727..08ebd24fd 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -3,8 +3,10 @@ package topology import ( "errors" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "math/rand" "sync" + "time" "github.com/chrislusf/raft" @@ -27,7 +29,8 @@ type Topology struct { pulse int64 - volumeSizeLimit uint64 + volumeSizeLimit uint64 + replicationAsMin bool Sequence sequence.Sequencer @@ -38,16 +41,18 @@ type Topology struct { RaftServer raft.Server } -func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) *Topology { +func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology { t := &Topology{} t.id = NodeId(id) t.nodeType = "Topology" t.NodeImpl.value = t + t.diskUsages = newDiskUsages() t.children = make(map[NodeId]Node) t.collectionMap = util.NewConcurrentReadMap() t.ecShardMap = make(map[needle.VolumeId]*EcShardLocations) t.pulse = int64(pulse) t.volumeSizeLimit = volumeSizeLimit + t.replicationAsMin = replicationAsMin t.Sequence = seq @@ -60,29 +65,32 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls func (t *Topology) IsLeader() bool { if t.RaftServer != nil { - return t.RaftServer.State() == raft.Leader + if t.RaftServer.State() == raft.Leader { + return true + } } return false } func (t *Topology) Leader() (string, error) { l := "" - if t.RaftServer != nil { - l = t.RaftServer.Leader() - } else { - return "", errors.New("Raft Server not ready yet!") - } - - if l == "" { - // We are a single node cluster, we are the leader - return t.RaftServer.Name(), errors.New("Raft Server not initialized!") + for count := 0; count < 3; count++ { + if t.RaftServer != nil { + l = t.RaftServer.Leader() + } else { + return "", errors.New("Raft Server not ready yet!") + } + if l != "" { + break + } else { + time.Sleep(time.Duration(5+count) * time.Second) + } } - return l, nil } func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*DataNode) { - //maybe an issue if lots of collections? + // maybe an issue if lots of collections? if collection == "" { for _, c := range t.collectionMap.Items() { if list := c.(*Collection).Lookup(vid); list != nil { @@ -115,12 +123,12 @@ func (t *Topology) NextVolumeId() (needle.VolumeId, error) { } func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool { - vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) + vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType) return vl.GetActiveVolumeCount(option) > 0 } func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) { - vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option) + vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType).PickForWrite(count, option) if err != nil { return "", 0, nil, fmt.Errorf("failed to find writable volumes for collection:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err) } @@ -131,10 +139,10 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil } -func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout { +func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) *VolumeLayout { return t.collectionMap.Get(collectionName, func() interface{} { - return NewCollection(collectionName, t.volumeSizeLimit) - }).(*Collection).GetOrCreateVolumeLayout(rp, ttl) + return NewCollection(collectionName, t.volumeSizeLimit, t.replicationAsMin) + }).(*Collection).GetOrCreateVolumeLayout(rp, ttl, diskType) } func (t *Topology) ListCollections(includeNormalVolumes, includeEcVolumes bool) (ret []string) { @@ -152,7 +160,7 @@ func (t *Topology) ListCollections(includeNormalVolumes, includeEcVolumes bool) t.ecShardMapLock.RUnlock() } - for k, _ := range mapOfCollections { + for k := range mapOfCollections { ret = append(ret, k) } return ret @@ -170,15 +178,30 @@ func (t *Topology) DeleteCollection(collectionName string) { t.collectionMap.Delete(collectionName) } +func (t *Topology) DeleteLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) { + collection, found := t.FindCollection(collectionName) + if !found { + return + } + collection.DeleteVolumeLayout(rp, ttl, diskType) + if len(collection.storageType2VolumeLayout.Items()) == 0 { + t.DeleteCollection(collectionName) + } +} + func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { - t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).RegisterVolume(&v, dn) + diskType := types.ToDiskType(v.DiskType) + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) + vl.RegisterVolume(&v, dn) + vl.EnsureCorrectWritables(&v) } func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { - glog.Infof("removing volume info:%+v", v) - volumeLayout := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) + glog.Infof("removing volume info: %+v", v) + diskType := types.ToDiskType(v.DiskType) + volumeLayout := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) volumeLayout.UnRegisterVolume(&v, dn) if volumeLayout.isEmpty() { - t.DeleteCollection(v.Collection) + t.DeleteLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) } } @@ -205,13 +228,19 @@ func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformati } } // find out the delta volumes - newVolumes, deletedVolumes = dn.UpdateVolumes(volumeInfos) + var changedVolumes []storage.VolumeInfo + newVolumes, deletedVolumes, changedVolumes = dn.UpdateVolumes(volumeInfos) for _, v := range newVolumes { t.RegisterVolumeLayout(v, dn) } for _, v := range deletedVolumes { t.UnRegisterVolumeLayout(v, dn) } + for _, v := range changedVolumes { + diskType := types.ToDiskType(v.DiskType) + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) + vl.EnsureCorrectWritables(&v) + } return } diff --git a/weed/topology/topology_ec.go b/weed/topology/topology_ec.go index 93b39bb5d..022eeb578 100644 --- a/weed/topology/topology_ec.go +++ b/weed/topology/topology_ec.go @@ -18,6 +18,7 @@ func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInf for _, shardInfo := range shardInfos { shards = append(shards, erasure_coding.NewEcVolumeInfo( + shardInfo.DiskType, shardInfo.Collection, needle.VolumeId(shardInfo.Id), erasure_coding.ShardBits(shardInfo.EcIndexBits))) @@ -39,6 +40,7 @@ func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards for _, shardInfo := range newEcShards { newShards = append(newShards, erasure_coding.NewEcVolumeInfo( + shardInfo.DiskType, shardInfo.Collection, needle.VolumeId(shardInfo.Id), erasure_coding.ShardBits(shardInfo.EcIndexBits))) @@ -46,6 +48,7 @@ func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards for _, shardInfo := range deletedEcShards { deletedShards = append(deletedShards, erasure_coding.NewEcVolumeInfo( + shardInfo.DiskType, shardInfo.Collection, needle.VolumeId(shardInfo.Id), erasure_coding.ShardBits(shardInfo.EcIndexBits))) diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go index 068bd401e..543dacf29 100644 --- a/weed/topology/topology_event_handling.go +++ b/weed/topology/topology_event_handling.go @@ -1,6 +1,7 @@ package topology import ( + "github.com/chrislusf/seaweedfs/weed/storage/types" "google.golang.org/grpc" "math/rand" "time" @@ -37,7 +38,8 @@ func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, g }() } func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { - vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl) + diskType := types.ToDiskType(volumeInfo.DiskType) + vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl, diskType) if !vl.SetVolumeCapacityFull(volumeInfo.Id) { return false } @@ -47,7 +49,13 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { for _, dn := range vl.vid2location[volumeInfo.Id].list { if !volumeInfo.ReadOnly { - dn.UpAdjustActiveVolumeCountDelta(-1) + + disk := dn.getOrCreateDisk(volumeInfo.DiskType) + deltaDiskUsages := newDiskUsages() + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(volumeInfo.DiskType)) + deltaDiskUsage.activeVolumeCount = -1 + disk.UpAdjustDiskUsageDelta(deltaDiskUsages) + } } return true @@ -55,13 +63,14 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { func (t *Topology) UnRegisterDataNode(dn *DataNode) { for _, v := range dn.GetVolumes() { glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn.Id()) - vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) + diskType := types.ToDiskType(v.DiskType) + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) vl.SetVolumeUnavailable(dn, v.Id) } - dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) - dn.UpAdjustRemoteVolumeCountDelta(-dn.GetRemoteVolumeCount()) - dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount()) - dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount()) + + negativeUsages := dn.GetDiskUsages().negative() + dn.UpAdjustDiskUsageDelta(negativeUsages) + if dn.Parent() != nil { dn.Parent().UnlinkChildNode(dn.Id()) } diff --git a/weed/topology/topology_map.go b/weed/topology/topology_map.go index 73c55d77d..0fedb6221 100644 --- a/weed/topology/topology_map.go +++ b/weed/topology/topology_map.go @@ -4,8 +4,8 @@ import "github.com/chrislusf/seaweedfs/weed/pb/master_pb" func (t *Topology) ToMap() interface{} { m := make(map[string]interface{}) - m["Max"] = t.GetMaxVolumeCount() - m["Free"] = t.FreeSpace() + m["Max"] = t.diskUsages.GetMaxVolumeCount() + m["Free"] = t.diskUsages.FreeSpace() var dcs []interface{} for _, c := range t.Children() { dc := c.(*DataCenter) @@ -29,8 +29,8 @@ func (t *Topology) ToMap() interface{} { func (t *Topology) ToVolumeMap() interface{} { m := make(map[string]interface{}) - m["Max"] = t.GetMaxVolumeCount() - m["Free"] = t.FreeSpace() + m["Max"] = t.diskUsages.GetMaxVolumeCount() + m["Free"] = t.diskUsages.FreeSpace() dcs := make(map[NodeId]interface{}) for _, c := range t.Children() { dc := c.(*DataCenter) @@ -80,12 +80,8 @@ func (t *Topology) ToVolumeLocations() (volumeLocations []*master_pb.VolumeLocat func (t *Topology) ToTopologyInfo() *master_pb.TopologyInfo { m := &master_pb.TopologyInfo{ - Id: string(t.Id()), - VolumeCount: uint64(t.GetVolumeCount()), - MaxVolumeCount: uint64(t.GetMaxVolumeCount()), - FreeVolumeCount: uint64(t.FreeSpace()), - ActiveVolumeCount: uint64(t.GetActiveVolumeCount()), - RemoteVolumeCount: uint64(t.GetRemoteVolumeCount()), + Id: string(t.Id()), + DiskInfos: t.diskUsages.ToDiskInfo(), } for _, c := range t.Children() { dc := c.(*DataCenter) diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go index e7676ccf7..ecfe9d8d1 100644 --- a/weed/topology/topology_test.go +++ b/weed/topology/topology_test.go @@ -6,6 +6,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/super_block" + "github.com/chrislusf/seaweedfs/weed/storage/types" "testing" ) @@ -13,21 +14,24 @@ import ( func TestRemoveDataCenter(t *testing.T) { topo := setup(topologyLayout) topo.UnlinkChildNode(NodeId("dc2")) - if topo.GetActiveVolumeCount() != 15 { + if topo.diskUsages.usages[types.HardDriveType].activeVolumeCount != 15 { t.Fail() } topo.UnlinkChildNode(NodeId("dc3")) - if topo.GetActiveVolumeCount() != 12 { + if topo.diskUsages.usages[types.HardDriveType].activeVolumeCount != 12 { t.Fail() } } func TestHandlingVolumeServerHeartbeat(t *testing.T) { - topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5) + topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false) dc := topo.GetOrCreateDataCenter("dc1") rack := dc.GetOrCreateRack("rack1") - dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", 25) + maxVolumeCounts := make(map[string]uint32) + maxVolumeCounts[""] = 25 + maxVolumeCounts["ssd"] = 12 + dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", maxVolumeCounts) { volumeCount := 7 @@ -48,10 +52,30 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) { volumeMessages = append(volumeMessages, volumeMessage) } + for k := 1; k <= volumeCount; k++ { + volumeMessage := &master_pb.VolumeInformationMessage{ + Id: uint32(volumeCount + k), + Size: uint64(25432), + Collection: "", + FileCount: uint64(2343), + DeleteCount: uint64(345), + DeletedByteCount: 34524, + ReadOnly: false, + ReplicaPlacement: uint32(0), + Version: uint32(needle.CurrentVersion), + Ttl: 0, + DiskType: "ssd", + } + volumeMessages = append(volumeMessages, volumeMessage) + } + topo.SyncDataNodeRegistration(volumeMessages, dn) - assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount) - assert(t, "volumeCount", int(topo.volumeCount), volumeCount) + usageCounts := topo.diskUsages.usages[types.HardDriveType] + + assert(t, "activeVolumeCount1", int(usageCounts.activeVolumeCount), volumeCount) + assert(t, "volumeCount", int(usageCounts.volumeCount), volumeCount) + assert(t, "ssdVolumeCount", int(topo.diskUsages.usages[types.SsdType].volumeCount), volumeCount) } { @@ -78,8 +102,10 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) { //layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL) //assert(t, "writables", len(layout.writables), volumeCount) - assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount) - assert(t, "volumeCount", int(topo.volumeCount), volumeCount) + usageCounts := topo.diskUsages.usages[types.HardDriveType] + + assert(t, "activeVolumeCount1", int(usageCounts.activeVolumeCount), volumeCount) + assert(t, "volumeCount", int(usageCounts.volumeCount), volumeCount) } { @@ -96,26 +122,28 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) { nil, dn) rp, _ := super_block.NewReplicaPlacementFromString("000") - layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL) + layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL, types.HardDriveType) assert(t, "writables after repeated add", len(layout.writables), volumeCount) - assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount) - assert(t, "volumeCount", int(topo.volumeCount), volumeCount) + usageCounts := topo.diskUsages.usages[types.HardDriveType] + + assert(t, "activeVolumeCount1", int(usageCounts.activeVolumeCount), volumeCount) + assert(t, "volumeCount", int(usageCounts.volumeCount), volumeCount) topo.IncrementalSyncDataNodeRegistration( nil, []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage}, dn) assert(t, "writables after deletion", len(layout.writables), volumeCount-1) - assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount-1) - assert(t, "volumeCount", int(topo.volumeCount), volumeCount-1) + assert(t, "activeVolumeCount1", int(usageCounts.activeVolumeCount), volumeCount-1) + assert(t, "volumeCount", int(usageCounts.volumeCount), volumeCount-1) topo.IncrementalSyncDataNodeRegistration( []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage}, nil, dn) - for vid, _ := range layout.vid2location { + for vid := range layout.vid2location { println("after add volume id", vid) } for _, vid := range layout.writables { @@ -128,7 +156,9 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) { topo.UnRegisterDataNode(dn) - assert(t, "activeVolumeCount2", int(topo.activeVolumeCount), 0) + usageCounts := topo.diskUsages.usages[types.HardDriveType] + + assert(t, "activeVolumeCount2", int(usageCounts.activeVolumeCount), 0) } @@ -140,16 +170,20 @@ func assert(t *testing.T, message string, actual, expected int) { func TestAddRemoveVolume(t *testing.T) { - topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5) + topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false) dc := topo.GetOrCreateDataCenter("dc1") rack := dc.GetOrCreateRack("rack1") - dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", 25) + maxVolumeCounts := make(map[string]uint32) + maxVolumeCounts[""] = 25 + maxVolumeCounts["ssd"] = 12 + dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", maxVolumeCounts) v := storage.VolumeInfo{ Id: needle.VolumeId(1), Size: 100, Collection: "xcollection", + DiskType: "ssd", FileCount: 123, DeleteCount: 23, DeletedByteCount: 45, diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index ca626e973..9feb55b73 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -5,15 +5,16 @@ import ( "sync/atomic" "time" - "github.com/chrislusf/seaweedfs/weed/storage/needle" "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" ) -func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, +func (t *Topology) batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vid needle.VolumeId, locationlist *VolumeLocationList, garbageThreshold float64) (*VolumeLocationList, bool) { ch := make(chan int, locationlist.Length()) errCount := int32(0) @@ -41,19 +42,23 @@ func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vi }(index, dn.Url(), vid) } vacuumLocationList := NewVolumeLocationList() + + waitTimeout := time.NewTimer(time.Minute * time.Duration(t.volumeSizeLimit/1024/1024/1000+1)) + defer waitTimeout.Stop() + for range locationlist.list { select { case index := <-ch: if index != -1 { vacuumLocationList.list = append(vacuumLocationList.list, locationlist.list[index]) } - case <-time.After(30 * time.Minute): + case <-waitTimeout.C: return vacuumLocationList, false } } return vacuumLocationList, errCount == 0 && len(vacuumLocationList.list) > 0 } -func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, +func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool { vl.accessLock.Lock() vl.removeFromWritable(vid) @@ -65,7 +70,8 @@ func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url) err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{ - VolumeId: uint32(vid), + VolumeId: uint32(vid), + Preallocate: preallocate, }) return err }) @@ -79,24 +85,32 @@ func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, }(index, dn.Url(), vid) } isVacuumSuccess := true + + waitTimeout := time.NewTimer(3 * time.Minute * time.Duration(t.volumeSizeLimit/1024/1024/1000+1)) + defer waitTimeout.Stop() + for range locationlist.list { select { case canCommit := <-ch: isVacuumSuccess = isVacuumSuccess && canCommit - case <-time.After(30 * time.Minute): + case <-waitTimeout.C: return false } } return isVacuumSuccess } -func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) bool { +func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) bool { isCommitSuccess := true + isReadOnly := false for _, dn := range locationlist.list { glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url()) err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{ + resp, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{ VolumeId: uint32(vid), }) + if resp != nil && resp.IsReadOnly { + isReadOnly = true + } return err }) if err != nil { @@ -105,13 +119,15 @@ func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, v } else { glog.V(0).Infof("Complete Committing vacuum %d on %s", vid, dn.Url()) } - if isCommitSuccess { - vl.SetVolumeAvailable(dn, vid) + } + if isCommitSuccess { + for _, dn := range locationlist.list { + vl.SetVolumeAvailable(dn, vid, isReadOnly) } } return isCommitSuccess } -func batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) { +func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) { for _, dn := range locationlist.list { glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url()) err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { @@ -128,12 +144,12 @@ func batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, } } -func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, preallocate int64) int { +func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, preallocate int64) { // if there is vacuum going on, return immediately swapped := atomic.CompareAndSwapInt64(&t.vacuumLockCounter, 0, 1) if !swapped { - return 0 + return } defer atomic.StoreInt64(&t.vacuumLockCounter, 0) @@ -145,39 +161,37 @@ func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float for _, vl := range c.storageType2VolumeLayout.Items() { if vl != nil { volumeLayout := vl.(*VolumeLayout) - vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate) + t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate) } } } - return 0 } -func vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) { +func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) { volumeLayout.accessLock.RLock() tmpMap := make(map[needle.VolumeId]*VolumeLocationList) for vid, locationList := range volumeLayout.vid2location { - tmpMap[vid] = locationList + tmpMap[vid] = locationList.Copy() } volumeLayout.accessLock.RUnlock() for vid, locationList := range tmpMap { volumeLayout.accessLock.RLock() - isReadOnly, hasValue := volumeLayout.readonlyVolumes[vid] + isReadOnly := volumeLayout.readonlyVolumes.IsTrue(vid) volumeLayout.accessLock.RUnlock() - if hasValue && isReadOnly { + if isReadOnly { continue } glog.V(2).Infof("check vacuum on collection:%s volume:%d", c.Name, vid) - if vacuumLocationList, needVacuum := batchVacuumVolumeCheck( - grpcDialOption, volumeLayout, vid, locationList, garbageThreshold); needVacuum { - if batchVacuumVolumeCompact(grpcDialOption, volumeLayout, vid, vacuumLocationList, preallocate) { - batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, vacuumLocationList) + if vacuumLocationList, needVacuum := t.batchVacuumVolumeCheck(grpcDialOption, vid, locationList, garbageThreshold); needVacuum { + if t.batchVacuumVolumeCompact(grpcDialOption, volumeLayout, vid, vacuumLocationList, preallocate) { + t.batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, vacuumLocationList) } else { - batchVacuumVolumeCleanup(grpcDialOption, volumeLayout, vid, vacuumLocationList) + t.batchVacuumVolumeCleanup(grpcDialOption, volumeLayout, vid, vacuumLocationList) } } } diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go index 80fbc86cd..8941a049b 100644 --- a/weed/topology/volume_growth.go +++ b/weed/topology/volume_growth.go @@ -1,12 +1,15 @@ package topology import ( + "encoding/json" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "math/rand" "sync" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/super_block" + "github.com/chrislusf/seaweedfs/weed/util" "google.golang.org/grpc" @@ -23,14 +26,15 @@ This package is created to resolve these replica placement issues: */ type VolumeGrowOption struct { - Collection string - ReplicaPlacement *super_block.ReplicaPlacement - Ttl *needle.TTL - Prealloacte int64 - DataCenter string - Rack string - DataNode string - MemoryMapMaxSizeMb uint32 + Collection string `json:"collection,omitempty"` + ReplicaPlacement *super_block.ReplicaPlacement `json:"replication,omitempty"` + Ttl *needle.TTL `json:"ttl,omitempty"` + DiskType types.DiskType `json:"disk,omitempty"` + Prealloacte int64 `json:"prealloacte,omitempty"` + DataCenter string `json:"dataCenter,omitempty"` + Rack string `json:"rack,omitempty"` + DataNode string `json:"dataNode,omitempty"` + MemoryMapMaxSizeMb uint32 `json:"memoryMapMaxSizeMb,omitempty"` } type VolumeGrowth struct { @@ -38,7 +42,8 @@ type VolumeGrowth struct { } func (o *VolumeGrowOption) String() string { - return fmt.Sprintf("Collection:%s, ReplicaPlacement:%v, Ttl:%v, DataCenter:%s, Rack:%s, DataNode:%s", o.Collection, o.ReplicaPlacement, o.Ttl, o.DataCenter, o.Rack, o.DataNode) + blob, _ := json.Marshal(o) + return string(blob) } func NewDefaultVolumeGrowth() *VolumeGrowth { @@ -48,15 +53,20 @@ func NewDefaultVolumeGrowth() *VolumeGrowth { // one replication type may need rp.GetCopyCount() actual volumes // given copyCount, how many logical volumes to create func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) { + v := util.GetViper() + v.SetDefault("master.volume_growth.copy_1", 7) + v.SetDefault("master.volume_growth.copy_2", 6) + v.SetDefault("master.volume_growth.copy_3", 3) + v.SetDefault("master.volume_growth.copy_other", 1) switch copyCount { case 1: - count = 7 + count = v.GetInt("master.volume_growth.copy_1") case 2: - count = 6 + count = v.GetInt("master.volume_growth.copy_2") case 3: - count = 3 + count = v.GetInt("master.volume_growth.copy_3") default: - count = 1 + count = v.GetInt("master.volume_growth.copy_other") } return } @@ -79,6 +89,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targe if c, e := vg.findAndGrow(grpcDialOption, topo, option); e == nil { counter += c } else { + glog.V(0).Infof("create %d volume, created %d: %v", targetCount, counter, e) return counter, e } } @@ -106,21 +117,21 @@ func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topolo func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) { //find main datacenter and other data centers rp := option.ReplicaPlacement - mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node Node) error { + mainDataCenter, otherDataCenters, dc_err := topo.PickNodesByWeight(rp.DiffDataCenterCount+1, option, func(node Node) error { if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) { return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter) } if len(node.Children()) < rp.DiffRackCount+1 { return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1) } - if node.FreeSpace() < int64(rp.DiffRackCount+rp.SameRackCount+1) { - return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1) + if node.AvailableSpaceFor(option) < int64(rp.DiffRackCount+rp.SameRackCount+1) { + return fmt.Errorf("Free:%d < Expected:%d", node.AvailableSpaceFor(option), rp.DiffRackCount+rp.SameRackCount+1) } possibleRacksCount := 0 for _, rack := range node.Children() { possibleDataNodesCount := 0 for _, n := range rack.Children() { - if n.FreeSpace() >= 1 { + if n.AvailableSpaceFor(option) >= 1 { possibleDataNodesCount++ } } @@ -138,12 +149,12 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum } //find main rack and other racks - mainRack, otherRacks, rackErr := mainDataCenter.(*DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node Node) error { + mainRack, otherRacks, rackErr := mainDataCenter.(*DataCenter).PickNodesByWeight(rp.DiffRackCount+1, option, func(node Node) error { if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) { return fmt.Errorf("Not matching preferred rack:%s", option.Rack) } - if node.FreeSpace() < int64(rp.SameRackCount+1) { - return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1) + if node.AvailableSpaceFor(option) < int64(rp.SameRackCount+1) { + return fmt.Errorf("Free:%d < Expected:%d", node.AvailableSpaceFor(option), rp.SameRackCount+1) } if len(node.Children()) < rp.SameRackCount+1 { // a bit faster way to test free racks @@ -151,7 +162,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum } possibleDataNodesCount := 0 for _, n := range node.Children() { - if n.FreeSpace() >= 1 { + if n.AvailableSpaceFor(option) >= 1 { possibleDataNodesCount++ } } @@ -165,12 +176,12 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum } //find main rack and other racks - mainServer, otherServers, serverErr := mainRack.(*Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node Node) error { + mainServer, otherServers, serverErr := mainRack.(*Rack).PickNodesByWeight(rp.SameRackCount+1, option, func(node Node) error { if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) { return fmt.Errorf("Not matching preferred data node:%s", option.DataNode) } - if node.FreeSpace() < 1 { - return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1) + if node.AvailableSpaceFor(option) < 1 { + return fmt.Errorf("Free:%d < Expected:%d", node.AvailableSpaceFor(option), 1) } return nil }) @@ -183,16 +194,16 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum servers = append(servers, server.(*DataNode)) } for _, rack := range otherRacks { - r := rand.Int63n(rack.FreeSpace()) - if server, e := rack.ReserveOneVolume(r); e == nil { + r := rand.Int63n(rack.AvailableSpaceFor(option)) + if server, e := rack.ReserveOneVolume(r, option); e == nil { servers = append(servers, server) } else { return servers, e } } for _, datacenter := range otherDataCenters { - r := rand.Int63n(datacenter.FreeSpace()) - if server, e := datacenter.ReserveOneVolume(r); e == nil { + r := rand.Int63n(datacenter.AvailableSpaceFor(option)) + if server, e := datacenter.ReserveOneVolume(r, option); e == nil { servers = append(servers, server) } else { return servers, e @@ -211,6 +222,7 @@ func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid ReplicaPlacement: option.ReplicaPlacement, Ttl: option.Ttl, Version: needle.CurrentVersion, + DiskType: string(option.DiskType), } server.AddOrUpdateVolume(vi) topo.RegisterVolumeLayout(vi, server) diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go index e3c5cc580..ab30cd43f 100644 --- a/weed/topology/volume_growth_test.go +++ b/weed/topology/volume_growth_test.go @@ -81,7 +81,7 @@ func setup(topologyLayout string) *Topology { fmt.Println("data:", data) //need to connect all nodes first before server adding volumes - topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5) + topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false) mTopology := data.(map[string]interface{}) for dcKey, dcValue := range mTopology { dc := NewDataCenter(dcKey) @@ -103,7 +103,13 @@ func setup(topologyLayout string) *Topology { Version: needle.CurrentVersion} server.AddOrUpdateVolume(vi) } - server.UpAdjustMaxVolumeCountDelta(int64(serverMap["limit"].(float64))) + + disk := server.getOrCreateDisk("") + deltaDiskUsages := newDiskUsages() + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk("") + deltaDiskUsage.maxVolumeCount = int64(serverMap["limit"].(float64)) + disk.UpAdjustDiskUsageDelta(deltaDiskUsages) + } } } @@ -131,3 +137,212 @@ func TestFindEmptySlotsForOneVolume(t *testing.T) { fmt.Println("assigned node :", server.Id()) } } + +var topologyLayout2 = ` +{ + "dc1":{ + "rack1":{ + "server111":{ + "volumes":[ + {"id":1, "size":12312}, + {"id":2, "size":12312}, + {"id":3, "size":12312} + ], + "limit":300 + }, + "server112":{ + "volumes":[ + {"id":4, "size":12312}, + {"id":5, "size":12312}, + {"id":6, "size":12312} + ], + "limit":300 + }, + "server113":{ + "volumes":[], + "limit":300 + }, + "server114":{ + "volumes":[], + "limit":300 + }, + "server115":{ + "volumes":[], + "limit":300 + }, + "server116":{ + "volumes":[], + "limit":300 + } + }, + "rack2":{ + "server121":{ + "volumes":[ + {"id":4, "size":12312}, + {"id":5, "size":12312}, + {"id":6, "size":12312} + ], + "limit":300 + }, + "server122":{ + "volumes":[], + "limit":300 + }, + "server123":{ + "volumes":[ + {"id":2, "size":12312}, + {"id":3, "size":12312}, + {"id":4, "size":12312} + ], + "limit":300 + }, + "server124":{ + "volumes":[], + "limit":300 + }, + "server125":{ + "volumes":[], + "limit":300 + }, + "server126":{ + "volumes":[], + "limit":300 + } + }, + "rack3":{ + "server131":{ + "volumes":[], + "limit":300 + }, + "server132":{ + "volumes":[], + "limit":300 + }, + "server133":{ + "volumes":[], + "limit":300 + }, + "server134":{ + "volumes":[], + "limit":300 + }, + "server135":{ + "volumes":[], + "limit":300 + }, + "server136":{ + "volumes":[], + "limit":300 + } + } + } +} +` + +func TestReplication011(t *testing.T) { + topo := setup(topologyLayout2) + vg := NewDefaultVolumeGrowth() + rp, _ := super_block.NewReplicaPlacementFromString("011") + volumeGrowOption := &VolumeGrowOption{ + Collection: "MAIL", + ReplicaPlacement: rp, + DataCenter: "dc1", + Rack: "", + DataNode: "", + } + servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption) + if err != nil { + fmt.Println("finding empty slots error :", err) + t.Fail() + } + for _, server := range servers { + fmt.Println("assigned node :", server.Id()) + } +} + +var topologyLayout3 = ` +{ + "dc1":{ + "rack1":{ + "server111":{ + "volumes":[], + "limit":2000 + } + } + }, + "dc2":{ + "rack2":{ + "server222":{ + "volumes":[], + "limit":2000 + } + } + }, + "dc3":{ + "rack3":{ + "server333":{ + "volumes":[], + "limit":1000 + } + } + }, + "dc4":{ + "rack4":{ + "server444":{ + "volumes":[], + "limit":1000 + } + } + }, + "dc5":{ + "rack5":{ + "server555":{ + "volumes":[], + "limit":500 + } + } + }, + "dc6":{ + "rack6":{ + "server666":{ + "volumes":[], + "limit":500 + } + } + } +} +` + +func TestFindEmptySlotsForOneVolumeScheduleByWeight(t *testing.T) { + topo := setup(topologyLayout3) + vg := NewDefaultVolumeGrowth() + rp, _ := super_block.NewReplicaPlacementFromString("100") + volumeGrowOption := &VolumeGrowOption{ + Collection: "Weight", + ReplicaPlacement: rp, + DataCenter: "", + Rack: "", + DataNode: "", + } + + distribution := map[NodeId]int{} + // assign 1000 volumes + for i := 0; i < 1000; i++ { + servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption) + if err != nil { + fmt.Println("finding empty slots error :", err) + t.Fail() + } + for _, server := range servers { + // fmt.Println("assigned node :", server.Id()) + if _, ok := distribution[server.id]; !ok { + distribution[server.id] = 0 + } + distribution[server.id] += 1 + } + } + + for k, v := range distribution { + fmt.Printf("%s : %d\n", k, v) + } +} diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 7633b28be..c7e171248 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -3,6 +3,7 @@ package topology import ( "errors" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "math/rand" "sync" "time" @@ -13,15 +14,103 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/super_block" ) +type copyState int + +const ( + noCopies copyState = 0 + iota + insufficientCopies + enoughCopies +) + +type volumeState string + +const ( + readOnlyState volumeState = "ReadOnly" + oversizedState = "Oversized" +) + +type stateIndicator func(copyState) bool + +func ExistCopies() stateIndicator { + return func(state copyState) bool { return state != noCopies } +} + +func NoCopies() stateIndicator { + return func(state copyState) bool { return state == noCopies } +} + +type volumesBinaryState struct { + rp *super_block.ReplicaPlacement + name volumeState // the name for volume state (eg. "Readonly", "Oversized") + indicator stateIndicator // indicate whether the volumes should be marked as `name` + copyMap map[needle.VolumeId]*VolumeLocationList +} + +func NewVolumesBinaryState(name volumeState, rp *super_block.ReplicaPlacement, indicator stateIndicator) *volumesBinaryState { + return &volumesBinaryState{ + rp: rp, + name: name, + indicator: indicator, + copyMap: make(map[needle.VolumeId]*VolumeLocationList), + } +} + +func (v *volumesBinaryState) Dump() (res []uint32) { + for vid, list := range v.copyMap { + if v.indicator(v.copyState(list)) { + res = append(res, uint32(vid)) + } + } + return +} + +func (v *volumesBinaryState) IsTrue(vid needle.VolumeId) bool { + list, _ := v.copyMap[vid] + return v.indicator(v.copyState(list)) +} + +func (v *volumesBinaryState) Add(vid needle.VolumeId, dn *DataNode) { + list, _ := v.copyMap[vid] + if list != nil { + list.Set(dn) + return + } + list = NewVolumeLocationList() + list.Set(dn) + v.copyMap[vid] = list +} + +func (v *volumesBinaryState) Remove(vid needle.VolumeId, dn *DataNode) { + list, _ := v.copyMap[vid] + if list != nil { + list.Remove(dn) + if list.Length() == 0 { + delete(v.copyMap, vid) + } + } +} + +func (v *volumesBinaryState) copyState(list *VolumeLocationList) copyState { + if list == nil { + return noCopies + } + if list.Length() < v.rp.GetCopyCount() { + return insufficientCopies + } + return enoughCopies +} + // mapping from volume to its locations, inverted from server to volume type VolumeLayout struct { rp *super_block.ReplicaPlacement ttl *needle.TTL + diskType types.DiskType vid2location map[needle.VolumeId]*VolumeLocationList - writables []needle.VolumeId // transient array of writable volume id - readonlyVolumes map[needle.VolumeId]bool // transient set of readonly volumes - oversizedVolumes map[needle.VolumeId]bool // set of oversized volumes + writables []needle.VolumeId // transient array of writable volume id + readonlyVolumes *volumesBinaryState // readonly volumes + oversizedVolumes *volumesBinaryState // oversized volumes volumeSizeLimit uint64 + replicationAsMin bool accessLock sync.RWMutex } @@ -31,19 +120,23 @@ type VolumeLayoutStats struct { FileCount uint64 } -func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64) *VolumeLayout { +func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType, volumeSizeLimit uint64, replicationAsMin bool) *VolumeLayout { return &VolumeLayout{ rp: rp, ttl: ttl, + diskType: diskType, vid2location: make(map[needle.VolumeId]*VolumeLocationList), writables: *new([]needle.VolumeId), - readonlyVolumes: make(map[needle.VolumeId]bool), - oversizedVolumes: make(map[needle.VolumeId]bool), + readonlyVolumes: NewVolumesBinaryState(readOnlyState, rp, ExistCopies()), + oversizedVolumes: NewVolumesBinaryState(oversizedState, rp, ExistCopies()), volumeSizeLimit: volumeSizeLimit, + replicationAsMin: replicationAsMin, } } func (vl *VolumeLayout) String() string { + vl.accessLock.RLock() + defer vl.accessLock.RUnlock() return fmt.Sprintf("rp:%v, ttl:%v, vid2location:%v, writables:%v, volumeSizeLimit:%v", vl.rp, vl.ttl, vl.vid2location, vl.writables, vl.volumeSizeLimit) } @@ -51,6 +144,8 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.accessLock.Lock() defer vl.accessLock.Unlock() + defer vl.rememberOversizedVolume(v, dn) + if _, ok := vl.vid2location[v.Id]; !ok { vl.vid2location[v.Id] = NewVolumeLocationList() } @@ -61,27 +156,26 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { if vInfo.ReadOnly { glog.V(1).Infof("vid %d removed from writable", v.Id) vl.removeFromWritable(v.Id) - vl.readonlyVolumes[v.Id] = true + vl.readonlyVolumes.Add(v.Id, dn) return } else { - delete(vl.readonlyVolumes, v.Id) + vl.readonlyVolumes.Remove(v.Id, dn) } } else { glog.V(1).Infof("vid %d removed from writable", v.Id) vl.removeFromWritable(v.Id) - delete(vl.readonlyVolumes, v.Id) + vl.readonlyVolumes.Remove(v.Id, dn) return } } - vl.rememberOversizedVolume(v) - vl.ensureCorrectWritables(v) - } -func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo) { +func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo, dn *DataNode) { if vl.isOversized(v) { - vl.oversizedVolumes[v.Id] = true + vl.oversizedVolumes.Add(v.Id, dn) + } else { + vl.oversizedVolumes.Remove(v.Id, dn) } } @@ -97,7 +191,9 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { if location.Remove(dn) { - vl.ensureCorrectWritables(v) + vl.readonlyVolumes.Remove(v.Id, dn) + vl.oversizedVolumes.Remove(v.Id, dn) + vl.ensureCorrectWritables(v.Id) if location.Length() == 0 { delete(vl.vid2location, v.Id) @@ -106,23 +202,32 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { } } -func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) { - if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { - if _, ok := vl.oversizedVolumes[v.Id]; !ok { - vl.addToWritable(v.Id) +func (vl *VolumeLayout) EnsureCorrectWritables(v *storage.VolumeInfo) { + vl.accessLock.Lock() + defer vl.accessLock.Unlock() + + vl.ensureCorrectWritables(v.Id) +} + +func (vl *VolumeLayout) ensureCorrectWritables(vid needle.VolumeId) { + if vl.enoughCopies(vid) && vl.isAllWritable(vid) { + if !vl.oversizedVolumes.IsTrue(vid) { + vl.setVolumeWritable(vid) } } else { - vl.removeFromWritable(v.Id) + vl.removeFromWritable(vid) } } -func (vl *VolumeLayout) addToWritable(vid needle.VolumeId) { - for _, id := range vl.writables { - if vid == id { - return +func (vl *VolumeLayout) isAllWritable(vid needle.VolumeId) bool { + for _, dn := range vl.vid2location[vid].list { + if v, getError := dn.GetVolumesById(vid); getError == nil { + if v.ReadOnly { + return false + } } } - vl.writables = append(vl.writables, vid) + return true } func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool { @@ -258,6 +363,8 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid needle.VolumeId) if location, ok := vl.vid2location[vid]; ok { if location.Remove(dn) { + vl.readonlyVolumes.Remove(vid, dn) + vl.oversizedVolumes.Remove(vid, dn) if location.Length() < vl.rp.GetCopyCount() { glog.V(0).Infoln("Volume", vid, "has", location.Length(), "replica, less than required", vl.rp.GetCopyCount()) return vl.removeFromWritable(vid) @@ -266,17 +373,33 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid needle.VolumeId) } return false } -func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId) bool { +func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId, isReadOnly bool) bool { vl.accessLock.Lock() defer vl.accessLock.Unlock() + vInfo, err := dn.GetVolumesById(vid) + if err != nil { + return false + } + vl.vid2location[vid].Set(dn) - if vl.vid2location[vid].Length() == vl.rp.GetCopyCount() { + + if vInfo.ReadOnly || isReadOnly { + return false + } + + if vl.enoughCopies(vid) { return vl.setVolumeWritable(vid) } return false } +func (vl *VolumeLayout) enoughCopies(vid needle.VolumeId) bool { + locations := vl.vid2location[vid].Length() + desired := vl.rp.GetCopyCount() + return locations == desired || (vl.replicationAsMin && locations > desired) +} + func (vl *VolumeLayout) SetVolumeCapacityFull(vid needle.VolumeId) bool { vl.accessLock.Lock() defer vl.accessLock.Unlock() @@ -306,10 +429,10 @@ func (vl *VolumeLayout) Stats() *VolumeLayoutStats { size, fileCount := vll.Stats(vid, freshThreshold) ret.FileCount += uint64(fileCount) ret.UsedSize += size - if vl.readonlyVolumes[vid] { + if vl.readonlyVolumes.IsTrue(vid) { ret.TotalSize += size } else { - ret.TotalSize += vl.volumeSizeLimit + ret.TotalSize += vl.volumeSizeLimit * uint64(vll.Length()) } } diff --git a/weed/topology/volume_layout_test.go b/weed/topology/volume_layout_test.go new file mode 100644 index 000000000..e148d6107 --- /dev/null +++ b/weed/topology/volume_layout_test.go @@ -0,0 +1,116 @@ +package topology + +import ( + "testing" + + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" +) + +func TestVolumesBinaryState(t *testing.T) { + vids := []needle.VolumeId{ + needle.VolumeId(1), + needle.VolumeId(2), + needle.VolumeId(3), + needle.VolumeId(4), + needle.VolumeId(5), + } + + dns := []*DataNode{ + &DataNode{ + Ip: "127.0.0.1", + Port: 8081, + }, + &DataNode{ + Ip: "127.0.0.1", + Port: 8082, + }, + &DataNode{ + Ip: "127.0.0.1", + Port: 8083, + }, + } + + rp, _ := super_block.NewReplicaPlacementFromString("002") + + state_exist := NewVolumesBinaryState(readOnlyState, rp, ExistCopies()) + state_exist.Add(vids[0], dns[0]) + state_exist.Add(vids[0], dns[1]) + state_exist.Add(vids[1], dns[2]) + state_exist.Add(vids[2], dns[1]) + state_exist.Add(vids[4], dns[1]) + state_exist.Add(vids[4], dns[2]) + + state_no := NewVolumesBinaryState(readOnlyState, rp, NoCopies()) + state_no.Add(vids[0], dns[0]) + state_no.Add(vids[0], dns[1]) + state_no.Add(vids[3], dns[1]) + + tests := []struct { + name string + state *volumesBinaryState + expectResult []bool + update func() + expectResultAfterUpdate []bool + }{ + { + name: "mark true when exist copies", + state: state_exist, + expectResult: []bool{true, true, true, false, true}, + update: func() { + state_exist.Remove(vids[0], dns[2]) + state_exist.Remove(vids[1], dns[2]) + state_exist.Remove(vids[3], dns[2]) + state_exist.Remove(vids[4], dns[1]) + state_exist.Remove(vids[4], dns[2]) + }, + expectResultAfterUpdate: []bool{true, false, true, false, false}, + }, + { + name: "mark true when inexist copies", + state: state_no, + expectResult: []bool{false, true, true, false, true}, + update: func() { + state_no.Remove(vids[0], dns[2]) + state_no.Remove(vids[1], dns[2]) + state_no.Add(vids[2], dns[1]) + state_no.Remove(vids[3], dns[1]) + state_no.Remove(vids[4], dns[2]) + }, + expectResultAfterUpdate: []bool{false, true, false, true, true}, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + var result []bool + for index, _ := range vids { + result = append(result, test.state.IsTrue(vids[index])) + } + if len(result) != len(test.expectResult) { + t.Fatalf("len(result) != len(expectResult), got %d, expected %d\n", + len(result), len(test.expectResult)) + } + for index, val := range result { + if val != test.expectResult[index] { + t.Fatalf("result not matched, index %d, got %v, expect %v\n", + index, val, test.expectResult[index]) + } + } + test.update() + var updateResult []bool + for index, _ := range vids { + updateResult = append(updateResult, test.state.IsTrue(vids[index])) + } + if len(updateResult) != len(test.expectResultAfterUpdate) { + t.Fatalf("len(updateResult) != len(expectResultAfterUpdate), got %d, expected %d\n", + len(updateResult), len(test.expectResultAfterUpdate)) + } + for index, val := range updateResult { + if val != test.expectResultAfterUpdate[index] { + t.Fatalf("update result not matched, index %d, got %v, expect %v\n", + index, val, test.expectResultAfterUpdate[index]) + } + } + }) + } +} diff --git a/weed/topology/volume_location_list.go b/weed/topology/volume_location_list.go index 8905c54b5..548c4cd25 100644 --- a/weed/topology/volume_location_list.go +++ b/weed/topology/volume_location_list.go @@ -18,12 +18,23 @@ func (dnll *VolumeLocationList) String() string { return fmt.Sprintf("%v", dnll.list) } +func (dnll *VolumeLocationList) Copy() *VolumeLocationList { + list := make([]*DataNode, len(dnll.list)) + copy(list, dnll.list) + return &VolumeLocationList{ + list: list, + } +} + func (dnll *VolumeLocationList) Head() *DataNode { //mark first node as master volume return dnll.list[0] } func (dnll *VolumeLocationList) Length() int { + if dnll == nil { + return 0 + } return len(dnll.list) } @@ -71,7 +82,7 @@ func (dnll *VolumeLocationList) Stats(vid needle.VolumeId, freshThreshHold int64 if dnl.LastSeen < freshThreshHold { vinfo, err := dnl.GetVolumesById(vid) if err == nil { - return vinfo.Size - vinfo.DeletedByteCount, vinfo.FileCount - vinfo.DeleteCount + return (vinfo.Size - vinfo.DeletedByteCount) * uint64(len(dnll.list)), vinfo.FileCount - vinfo.DeleteCount } } } |
