aboutsummaryrefslogtreecommitdiff
path: root/weed/topology/data_node_ec.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology/data_node_ec.go')
-rw-r--r--weed/topology/data_node_ec.go124
1 files changed, 66 insertions, 58 deletions
diff --git a/weed/topology/data_node_ec.go b/weed/topology/data_node_ec.go
index 75c8784fe..df1b6d658 100644
--- a/weed/topology/data_node_ec.go
+++ b/weed/topology/data_node_ec.go
@@ -3,12 +3,14 @@ package topology
import (
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
)
func (dn *DataNode) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) {
dn.RLock()
- for _, ecVolumeInfo := range dn.ecShards {
- ret = append(ret, ecVolumeInfo)
+ for _, c := range dn.children {
+ disk := c.(*Disk)
+ ret = append(ret, disk.GetEcShards()...)
}
dn.RUnlock()
return ret
@@ -21,10 +23,17 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo)
actualEcShardMap[ecShards.VolumeId] = ecShards
}
+ existingEcShards := dn.GetEcShards()
+
// found out the newShards and deletedShards
var newShardCount, deletedShardCount int
- dn.ecShardsLock.RLock()
- for vid, ecShards := range dn.ecShards {
+ for _, ecShards := range existingEcShards {
+
+ disk := dn.getOrCreateDisk(ecShards.DiskType)
+ deltaDiskUsages := newDiskUsages()
+ deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType))
+
+ vid := ecShards.VolumeId
if actualEcShards, ok := actualEcShardMap[vid]; !ok {
// dn registered ec shards not found in the new set of ec shards
deletedShards = append(deletedShards, ecShards)
@@ -42,26 +51,61 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo)
deletedShardCount += d.ShardIdCount()
}
}
+
+ deltaDiskUsage.ecShardCount = int64(newShardCount - deletedShardCount)
+ disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
+
}
for _, ecShards := range actualShards {
- if _, found := dn.ecShards[ecShards.VolumeId]; !found {
+
+ disk := dn.getOrCreateDisk(ecShards.DiskType)
+ deltaDiskUsages := newDiskUsages()
+ deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType))
+
+ if !dn.hasEcShards(ecShards.VolumeId) {
newShards = append(newShards, ecShards)
newShardCount += ecShards.ShardIdCount()
}
+
+ deltaDiskUsage.ecShardCount = int64(newShardCount)
+ disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
+
}
- dn.ecShardsLock.RUnlock()
if len(newShards) > 0 || len(deletedShards) > 0 {
// if changed, set to the new ec shard map
- dn.ecShardsLock.Lock()
- dn.ecShards = actualEcShardMap
- dn.UpAdjustEcShardCountDelta(int64(newShardCount - deletedShardCount))
- dn.ecShardsLock.Unlock()
+ dn.doUpdateEcShards(actualShards)
}
return
}
+func (dn *DataNode) hasEcShards(volumeId needle.VolumeId) (found bool) {
+ dn.RLock()
+ defer dn.RUnlock()
+ for _, c := range dn.children {
+ disk := c.(*Disk)
+ _, found = disk.ecShards[volumeId]
+ if found {
+ return
+ }
+ }
+ return
+}
+
+func (dn *DataNode) doUpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) {
+ dn.Lock()
+ for _, c := range dn.children {
+ disk := c.(*Disk)
+ disk.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
+ }
+ for _, shard := range actualShards {
+ disk := dn.getOrCreateDisk(shard.DiskType)
+ disk.ecShards[shard.VolumeId] = shard
+ }
+ dn.Unlock()
+}
+
func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
for _, newShard := range newShards {
@@ -75,61 +119,25 @@ func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_codi
}
func (dn *DataNode) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) {
- dn.ecShardsLock.Lock()
- defer dn.ecShardsLock.Unlock()
-
- delta := 0
- if existing, ok := dn.ecShards[s.VolumeId]; !ok {
- dn.ecShards[s.VolumeId] = s
- delta = s.ShardBits.ShardIdCount()
- } else {
- oldCount := existing.ShardBits.ShardIdCount()
- existing.ShardBits = existing.ShardBits.Plus(s.ShardBits)
- delta = existing.ShardBits.ShardIdCount() - oldCount
- }
-
- dn.UpAdjustEcShardCountDelta(int64(delta))
-
+ disk := dn.getOrCreateDisk(s.DiskType)
+ disk.AddOrUpdateEcShard(s)
}
func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) {
- dn.ecShardsLock.Lock()
- defer dn.ecShardsLock.Unlock()
-
- if existing, ok := dn.ecShards[s.VolumeId]; ok {
- oldCount := existing.ShardBits.ShardIdCount()
- existing.ShardBits = existing.ShardBits.Minus(s.ShardBits)
- delta := existing.ShardBits.ShardIdCount() - oldCount
- dn.UpAdjustEcShardCountDelta(int64(delta))
- if existing.ShardBits.ShardIdCount() == 0 {
- delete(dn.ecShards, s.VolumeId)
- }
- }
-
+ disk := dn.getOrCreateDisk(s.DiskType)
+ disk.DeleteEcShard(s)
}
-func (dn *DataNode) HasVolumesById(id needle.VolumeId) (hasVolumeId bool) {
+func (dn *DataNode) HasVolumesById(volumeId needle.VolumeId) (hasVolumeId bool) {
- // check whether normal volumes has this volume id
dn.RLock()
- _, ok := dn.volumes[id]
- if ok {
- hasVolumeId = true
- }
- dn.RUnlock()
-
- if hasVolumeId {
- return
- }
-
- // check whether ec shards has this volume id
- dn.ecShardsLock.RLock()
- _, ok = dn.ecShards[id]
- if ok {
- hasVolumeId = true
+ defer dn.RUnlock()
+ for _, c := range dn.children {
+ disk := c.(*Disk)
+ if disk.HasVolumesById(volumeId) {
+ return true
+ }
}
- dn.ecShardsLock.RUnlock()
-
- return
+ return false
}