aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/server/master_grpc_server.go7
-rw-r--r--weed/storage/erasure_coding/ec_volume_info.go4
-rw-r--r--weed/topology/data_node_ec.go37
-rw-r--r--weed/topology/topology_ec.go29
4 files changed, 77 insertions, 0 deletions
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index bc6463251..69bd56df0 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -103,6 +103,13 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
}
+ if len(heartbeat.NewEcShards) > 0 || len(heartbeat.DeletedEcShards) > 0 {
+ // TODO send out the delta
+
+ // update master internal volume layouts
+ t.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn)
+ }
+
if len(heartbeat.EcShards) > 0 {
glog.V(0).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
newShards, deletedShards := t.SyncDataNodeEcShards(heartbeat.EcShards, dn)
diff --git a/weed/storage/erasure_coding/ec_volume_info.go b/weed/storage/erasure_coding/ec_volume_info.go
index ef8cc4ed4..48f4713d9 100644
--- a/weed/storage/erasure_coding/ec_volume_info.go
+++ b/weed/storage/erasure_coding/ec_volume_info.go
@@ -91,3 +91,7 @@ func (b ShardBits) ShardIdCount() (count int) {
func (b ShardBits) Minus(other ShardBits) (ShardBits) {
return b &^ other
}
+
+func (b ShardBits) Plus(other ShardBits) (ShardBits) {
+ return b | other
+}
diff --git a/weed/topology/data_node_ec.go b/weed/topology/data_node_ec.go
index 63c8f2127..95635331b 100644
--- a/weed/topology/data_node_ec.go
+++ b/weed/topology/data_node_ec.go
@@ -55,3 +55,40 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo)
return
}
+
+func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
+
+ for _, newShard := range newShards {
+ dn.AddOrUpdateEcShard(newShard)
+ }
+
+ for _, deletedShard := range deletedShards {
+ dn.DeleteEcShard(deletedShard)
+ }
+
+}
+
+func (dn *DataNode) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) {
+ dn.ecShardsLock.Lock()
+ defer dn.ecShardsLock.Unlock()
+
+ if existing, ok := dn.ecShards[s.VolumeId]; !ok {
+ dn.ecShards[s.VolumeId] = s
+ } else {
+ existing.ShardBits = existing.ShardBits.Plus(s.ShardBits)
+ }
+
+}
+
+func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) {
+ dn.ecShardsLock.Lock()
+ defer dn.ecShardsLock.Unlock()
+
+ if existing, ok := dn.ecShards[s.VolumeId]; ok {
+ existing.ShardBits = existing.ShardBits.Minus(s.ShardBits)
+ if existing.ShardBits.ShardIdCount() == 0 {
+ delete(dn.ecShards, s.VolumeId)
+ }
+ }
+
+}
diff --git a/weed/topology/topology_ec.go b/weed/topology/topology_ec.go
index 050a0b901..61de86753 100644
--- a/weed/topology/topology_ec.go
+++ b/weed/topology/topology_ec.go
@@ -33,6 +33,35 @@ func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInf
return
}
+func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) {
+ // convert into in memory struct storage.VolumeInfo
+ var newShards, deletedShards []*erasure_coding.EcVolumeInfo
+ for _, shardInfo := range newEcShards {
+ newShards = append(newShards,
+ erasure_coding.NewEcVolumeInfo(
+ shardInfo.Collection,
+ needle.VolumeId(shardInfo.Id),
+ erasure_coding.ShardBits(shardInfo.EcIndexBits)))
+ }
+ for _, shardInfo := range deletedEcShards {
+ deletedShards = append(deletedShards,
+ erasure_coding.NewEcVolumeInfo(
+ shardInfo.Collection,
+ needle.VolumeId(shardInfo.Id),
+ erasure_coding.ShardBits(shardInfo.EcIndexBits)))
+ }
+
+ dn.DeltaUpdateEcShards(newShards, deletedShards)
+
+ for _, v := range newShards {
+ t.RegisterEcShards(v, dn)
+ }
+ for _, v := range deletedShards {
+ t.UnRegisterEcShards(v, dn)
+ }
+ return
+}
+
func NewEcShardLocations(collection string) *EcShardLocations {
return &EcShardLocations{
Collection: collection,