aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-05-23 23:34:29 -0700
committerChris Lu <chris.lu@gmail.com>2019-05-23 23:34:29 -0700
commit8838176d822927a2bd17c5d51fab9ff6ff1fcb58 (patch)
tree1fb67fba2c93943a31cb83ef0b9acaa45c72124c
parent8a96445f402a240956adf081de3a675e2e41c767 (diff)
downloadseaweedfs-8838176d822927a2bd17c5d51fab9ff6ff1fcb58.tar.xz
seaweedfs-8838176d822927a2bd17c5d51fab9ff6ff1fcb58.zip
register ec shards to topology
-rw-r--r--weed/topology/data_node_ec.go10
-rw-r--r--weed/topology/topology.go6
-rw-r--r--weed/topology/topology_ec.go64
3 files changed, 73 insertions, 7 deletions
diff --git a/weed/topology/data_node_ec.go b/weed/topology/data_node_ec.go
index ef395dbdb..e8ead5511 100644
--- a/weed/topology/data_node_ec.go
+++ b/weed/topology/data_node_ec.go
@@ -18,23 +18,23 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo)
// prepare the new ec shard map
actualEcShardMap := make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
for _, ecShards := range actualShards {
- actualEcShardMap[ecShards.VolumeId]= ecShards
+ actualEcShardMap[ecShards.VolumeId] = ecShards
}
// found out the newShards and deletedShards
dn.ecShardsLock.RLock()
- for vid, ecShards := range dn.ecShards{
+ for vid, ecShards := range dn.ecShards {
if actualEcShards, ok := actualEcShardMap[vid]; !ok {
// dn registered ec shards not found in the new set of ec shards
deletedShards = append(deletedShards, ecShards)
} else {
// found, but maybe the actual shard could be missing
a := actualEcShards.Minus(ecShards)
- if len(a.ShardIds())>0 {
+ if len(a.ShardIds()) > 0 {
newShards = append(newShards, a)
}
d := ecShards.Minus(actualEcShards)
- if len(d.ShardIds())>0 {
+ if len(d.ShardIds()) > 0 {
deletedShards = append(deletedShards, d)
}
}
@@ -46,7 +46,7 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo)
}
dn.ecShardsLock.RUnlock()
- if len(newShards)>0 || len(deletedShards)>0{
+ if len(newShards) > 0 || len(deletedShards) > 0 {
// if changed, set to the new ec shard map
dn.ecShardsLock.Lock()
dn.ecShards = actualEcShardMap
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index e463aae29..667846f02 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"math/rand"
+ "sync"
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -17,7 +18,9 @@ import (
type Topology struct {
NodeImpl
- collectionMap *util.ConcurrentReadMap
+ collectionMap *util.ConcurrentReadMap
+ ecShardMap map[needle.VolumeId]*EcShardLocations
+ ecShardMapLock sync.RWMutex
pulse int64
@@ -39,6 +42,7 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls
t.NodeImpl.value = t
t.children = make(map[NodeId]Node)
t.collectionMap = util.NewConcurrentReadMap()
+ t.ecShardMap = make(map[needle.VolumeId]*EcShardLocations)
t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit
diff --git a/weed/topology/topology_ec.go b/weed/topology/topology_ec.go
index 5592b9b64..8c8a0b67b 100644
--- a/weed/topology/topology_ec.go
+++ b/weed/topology/topology_ec.go
@@ -1,6 +1,7 @@
package topology
import (
+ "math"
"sort"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -9,6 +10,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/needle"
)
+const shardCount = erasure_coding.DataShardsCount + erasure_coding.ParityShardsCount
+
+type EcShardLocations struct {
+ Collection string
+ locations [shardCount][]*DataNode
+}
func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
// convert into in memory struct storage.VolumeInfo
@@ -16,7 +23,7 @@ func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInf
sort.Slice(shardInfos, func(i, j int) bool {
return shardInfos[i].Id < shardInfos[j].Id
})
- var prevVolumeId uint32
+ prevVolumeId := uint32(math.MaxUint32)
var ecVolumeInfo *erasure_coding.EcVolumeInfo
for _, shardInfo := range shardInfos {
if shardInfo.Id != prevVolumeId {
@@ -36,8 +43,63 @@ func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInf
return
}
+func NewEcShardLocations(collection string) *EcShardLocations {
+ return &EcShardLocations{
+ Collection: collection,
+ }
+}
+
+func (loc *EcShardLocations) AddShard(shardId erasure_coding.ShardId, dn *DataNode) (added bool) {
+ dataNodes := loc.locations[shardId]
+ for _, n := range dataNodes {
+ if n.Id() == dn.Id() {
+ return false
+ }
+ }
+ loc.locations[shardId] = append(dataNodes, dn)
+ return true
+}
+
+func (loc *EcShardLocations) DeleteShard(shardId erasure_coding.ShardId, dn *DataNode) (deleted bool) {
+ dataNodes := loc.locations[shardId]
+ foundIndex := -1
+ for index, n := range dataNodes {
+ if n.Id() == dn.Id() {
+ foundIndex = index
+ }
+ }
+ if foundIndex < 0 {
+ return false
+ }
+ loc.locations[shardId] = append(dataNodes[:foundIndex], dataNodes[foundIndex+1:]...)
+ return true
+}
+
func (t *Topology) RegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) {
+
+ t.ecShardMapLock.Lock()
+ defer t.ecShardMapLock.Unlock()
+
+ locations, found := t.ecShardMap[ecShardInfos.VolumeId]
+ if !found {
+ locations = NewEcShardLocations(ecShardInfos.Collection)
+ t.ecShardMap[ecShardInfos.VolumeId] = locations
+ }
+ for _, shardId := range ecShardInfos.ShardIds() {
+ locations.AddShard(shardId, dn)
+ }
}
+
func (t *Topology) UnRegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) {
glog.Infof("removing ec shard info:%+v", ecShardInfos)
+ t.ecShardMapLock.Lock()
+ defer t.ecShardMapLock.Unlock()
+
+ locations, found := t.ecShardMap[ecShardInfos.VolumeId]
+ if !found {
+ return
+ }
+ for _, shardId := range ecShardInfos.ShardIds() {
+ locations.DeleteShard(shardId, dn)
+ }
}