aboutsummaryrefslogtreecommitdiff
path: root/weed/topology/topology_ec.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology/topology_ec.go')
-rw-r--r--weed/topology/topology_ec.go64
1 files changed, 63 insertions, 1 deletions
diff --git a/weed/topology/topology_ec.go b/weed/topology/topology_ec.go
index 5592b9b64..8c8a0b67b 100644
--- a/weed/topology/topology_ec.go
+++ b/weed/topology/topology_ec.go
@@ -1,6 +1,7 @@
package topology
import (
+ "math"
"sort"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -9,6 +10,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/needle"
)
+const shardCount = erasure_coding.DataShardsCount + erasure_coding.ParityShardsCount
+
+type EcShardLocations struct {
+ Collection string
+ locations [shardCount][]*DataNode
+}
func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
// convert into in memory struct storage.VolumeInfo
@@ -16,7 +23,7 @@ func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInf
sort.Slice(shardInfos, func(i, j int) bool {
return shardInfos[i].Id < shardInfos[j].Id
})
- var prevVolumeId uint32
+ prevVolumeId := uint32(math.MaxUint32)
var ecVolumeInfo *erasure_coding.EcVolumeInfo
for _, shardInfo := range shardInfos {
if shardInfo.Id != prevVolumeId {
@@ -36,8 +43,63 @@ func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInf
return
}
+func NewEcShardLocations(collection string) *EcShardLocations {
+ return &EcShardLocations{
+ Collection: collection,
+ }
+}
+
+func (loc *EcShardLocations) AddShard(shardId erasure_coding.ShardId, dn *DataNode) (added bool) {
+ dataNodes := loc.locations[shardId]
+ for _, n := range dataNodes {
+ if n.Id() == dn.Id() {
+ return false
+ }
+ }
+ loc.locations[shardId] = append(dataNodes, dn)
+ return true
+}
+
+func (loc *EcShardLocations) DeleteShard(shardId erasure_coding.ShardId, dn *DataNode) (deleted bool) {
+ dataNodes := loc.locations[shardId]
+ foundIndex := -1
+ for index, n := range dataNodes {
+ if n.Id() == dn.Id() {
+ foundIndex = index
+ }
+ }
+ if foundIndex < 0 {
+ return false
+ }
+ loc.locations[shardId] = append(dataNodes[:foundIndex], dataNodes[foundIndex+1:]...)
+ return true
+}
+
func (t *Topology) RegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) {
+
+ t.ecShardMapLock.Lock()
+ defer t.ecShardMapLock.Unlock()
+
+ locations, found := t.ecShardMap[ecShardInfos.VolumeId]
+ if !found {
+ locations = NewEcShardLocations(ecShardInfos.Collection)
+ t.ecShardMap[ecShardInfos.VolumeId] = locations
+ }
+ for _, shardId := range ecShardInfos.ShardIds() {
+ locations.AddShard(shardId, dn)
+ }
}
+
func (t *Topology) UnRegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) {
glog.Infof("removing ec shard info:%+v", ecShardInfos)
+ t.ecShardMapLock.Lock()
+ defer t.ecShardMapLock.Unlock()
+
+ locations, found := t.ecShardMap[ecShardInfos.VolumeId]
+ if !found {
+ return
+ }
+ for _, shardId := range ecShardInfos.ShardIds() {
+ locations.DeleteShard(shardId, dn)
+ }
}