diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-02-16 02:47:02 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-02-16 02:47:02 -0800 |
| commit | f8446b42abd7f3c6c0a298dbbb8641e466891561 (patch) | |
| tree | 84005ad6433f8f1d734624eba1e3c9166208f50f /weed/topology/data_node_ec.go | |
| parent | 71f0c195157b79223a3c8e35a57da10b7ff0720d (diff) | |
| download | seaweedfs-f8446b42abd7f3c6c0a298dbbb8641e466891561.tar.xz seaweedfs-f8446b42abd7f3c6c0a298dbbb8641e466891561.zip | |
this can compile now!!!
Diffstat (limited to 'weed/topology/data_node_ec.go')
| -rw-r--r-- | weed/topology/data_node_ec.go | 124 |
1 files changed, 66 insertions, 58 deletions
diff --git a/weed/topology/data_node_ec.go b/weed/topology/data_node_ec.go index 75c8784fe..be6df3b8a 100644 --- a/weed/topology/data_node_ec.go +++ b/weed/topology/data_node_ec.go @@ -3,12 +3,14 @@ package topology import ( "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" ) func (dn *DataNode) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) { dn.RLock() - for _, ecVolumeInfo := range dn.ecShards { - ret = append(ret, ecVolumeInfo) + for _, c := range dn.children { + disk := c.(*Disk) + ret = append(ret, disk.GetEcShards()...) } dn.RUnlock() return ret @@ -21,10 +23,17 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) actualEcShardMap[ecShards.VolumeId] = ecShards } + existingEcShards := dn.GetEcShards() + // found out the newShards and deletedShards var newShardCount, deletedShardCount int - dn.ecShardsLock.RLock() - for vid, ecShards := range dn.ecShards { + for _, ecShards := range existingEcShards { + + disk := dn.getOrCreateDisk(ecShards.DiskType) + deltaDiskUsages := newDiskUsages() + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.DiskType(ecShards.DiskType)) + + vid := ecShards.VolumeId if actualEcShards, ok := actualEcShardMap[vid]; !ok { // dn registered ec shards not found in the new set of ec shards deletedShards = append(deletedShards, ecShards) @@ -42,26 +51,61 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) deletedShardCount += d.ShardIdCount() } } + + deltaDiskUsage.ecShardCount = int64(newShardCount - deletedShardCount) + disk.UpAdjustDiskUsageDelta(deltaDiskUsages) + } for _, ecShards := range actualShards { - if _, found := dn.ecShards[ecShards.VolumeId]; !found { + + disk := dn.getOrCreateDisk(ecShards.DiskType) + deltaDiskUsages := newDiskUsages() + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.DiskType(ecShards.DiskType)) + + if dn.hasEcShards(ecShards.VolumeId) { newShards = append(newShards, ecShards) newShardCount += ecShards.ShardIdCount() } + + deltaDiskUsage.ecShardCount = int64(newShardCount) + disk.UpAdjustDiskUsageDelta(deltaDiskUsages) + } - dn.ecShardsLock.RUnlock() if len(newShards) > 0 || len(deletedShards) > 0 { // if changed, set to the new ec shard map - dn.ecShardsLock.Lock() - dn.ecShards = actualEcShardMap - dn.UpAdjustEcShardCountDelta(int64(newShardCount - deletedShardCount)) - dn.ecShardsLock.Unlock() + dn.doUpdateEcShards(actualShards) } return } +func (dn *DataNode) hasEcShards(volumeId needle.VolumeId) (found bool) { + dn.RLock() + defer dn.RUnlock() + for _, c := range dn.children { + disk := c.(*Disk) + _, found = disk.ecShards[volumeId] + if found { + return + } + } + return +} + +func (dn *DataNode) doUpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) { + dn.Lock() + for _, c := range dn.children { + disk := c.(*Disk) + disk.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo) + } + for _, shard := range actualShards { + disk := dn.getOrCreateDisk(shard.DiskType) + disk.ecShards[shard.VolumeId] = shard + } + dn.Unlock() +} + func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_coding.EcVolumeInfo) { for _, newShard := range newShards { @@ -75,61 +119,25 @@ func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_codi } func (dn *DataNode) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) { - dn.ecShardsLock.Lock() - defer dn.ecShardsLock.Unlock() - - delta := 0 - if existing, ok := dn.ecShards[s.VolumeId]; !ok { - dn.ecShards[s.VolumeId] = s - delta = s.ShardBits.ShardIdCount() - } else { - oldCount := existing.ShardBits.ShardIdCount() - existing.ShardBits = existing.ShardBits.Plus(s.ShardBits) - delta = existing.ShardBits.ShardIdCount() - oldCount - } - - dn.UpAdjustEcShardCountDelta(int64(delta)) - + disk := dn.getOrCreateDisk(s.DiskType) + disk.AddOrUpdateEcShard(s) } func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) { - dn.ecShardsLock.Lock() - defer dn.ecShardsLock.Unlock() - - if existing, ok := dn.ecShards[s.VolumeId]; ok { - oldCount := existing.ShardBits.ShardIdCount() - existing.ShardBits = existing.ShardBits.Minus(s.ShardBits) - delta := existing.ShardBits.ShardIdCount() - oldCount - dn.UpAdjustEcShardCountDelta(int64(delta)) - if existing.ShardBits.ShardIdCount() == 0 { - delete(dn.ecShards, s.VolumeId) - } - } - + disk := dn.getOrCreateDisk(s.DiskType) + disk.DeleteEcShard(s) } -func (dn *DataNode) HasVolumesById(id needle.VolumeId) (hasVolumeId bool) { +func (dn *DataNode) HasVolumesById(volumeId needle.VolumeId) (hasVolumeId bool) { - // check whether normal volumes has this volume id dn.RLock() - _, ok := dn.volumes[id] - if ok { - hasVolumeId = true - } - dn.RUnlock() - - if hasVolumeId { - return - } - - // check whether ec shards has this volume id - dn.ecShardsLock.RLock() - _, ok = dn.ecShards[id] - if ok { - hasVolumeId = true + defer dn.RUnlock() + for _, c := range dn.children { + disk := c.(*Disk) + if disk.HasVolumesById(volumeId) { + return true + } } - dn.ecShardsLock.RUnlock() - - return + return false } |
