aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-05-23 00:42:28 -0700
committerChris Lu <chris.lu@gmail.com>2019-05-23 00:42:28 -0700
commit4659d80035430bcbd6f1642311680af0f7a6f215 (patch)
tree3cdc36c2e6c4385fef9f8a59a5baccce84057df0
parente913ee380a88cbc35e8baa6015ceb15971a05d20 (diff)
downloadseaweedfs-4659d80035430bcbd6f1642311680af0f7a6f215.tar.xz
seaweedfs-4659d80035430bcbd6f1642311680af0f7a6f215.zip
prepare to register ec shard info in master
-rw-r--r--weed/server/master_grpc_server.go8
-rw-r--r--weed/storage/erasure_coding/ec_volume_info.go7
-rw-r--r--weed/topology/data_node.go4
-rw-r--r--weed/topology/data_node_ec.go8
-rw-r--r--weed/topology/topology_ec.go43
5 files changed, 65 insertions, 5 deletions
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 34962c83c..bd28a15c8 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -87,7 +87,9 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
// update master internal volume layouts
t.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn)
- } else if len(heartbeat.Volumes) > 0 {
+ }
+
+ if len(heartbeat.Volumes) > 0 {
// process heartbeat.Volumes
newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn)
@@ -99,7 +101,9 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
glog.V(0).Infof("master see deleted volume %d from %s", uint32(v.Id), dn.Url())
message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
}
- } else if len(heartbeat.EcShards) > 0 {
+ }
+
+ if len(heartbeat.EcShards) > 0 {
glog.V(0).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
}
diff --git a/weed/storage/erasure_coding/ec_volume_info.go b/weed/storage/erasure_coding/ec_volume_info.go
index 2378ff4e9..9cebb7d40 100644
--- a/weed/storage/erasure_coding/ec_volume_info.go
+++ b/weed/storage/erasure_coding/ec_volume_info.go
@@ -12,6 +12,13 @@ type EcVolumeInfo struct {
shardIds uint16 // use bits to indicate the shard id
}
+func NewEcVolumeInfo(collection string, vid needle.VolumeId) *EcVolumeInfo {
+ return &EcVolumeInfo{
+ Collection: collection,
+ VolumeId: vid,
+ }
+}
+
func (ecInfo *EcVolumeInfo) AddShardId(id ShardId) {
ecInfo.shardIds |= (1 << id)
}
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go
index ff160e178..dba323518 100644
--- a/weed/topology/data_node.go
+++ b/weed/topology/data_node.go
@@ -21,7 +21,7 @@ type DataNode struct {
Port int
PublicUrl string
LastSeen int64 // unix time in seconds
- ecShards map[needle.VolumeId]erasure_coding.EcVolumeInfo
+ ecShards map[needle.VolumeId]*erasure_coding.EcVolumeInfo
ecShardsLock sync.RWMutex
}
@@ -30,7 +30,7 @@ func NewDataNode(id string) *DataNode {
s.id = NodeId(id)
s.nodeType = "DataNode"
s.volumes = make(map[needle.VolumeId]storage.VolumeInfo)
- s.ecShards = make(map[needle.VolumeId]erasure_coding.EcVolumeInfo)
+ s.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
s.NodeImpl.value = s
return s
}
diff --git a/weed/topology/data_node_ec.go b/weed/topology/data_node_ec.go
index d1715f557..5206a3b51 100644
--- a/weed/topology/data_node_ec.go
+++ b/weed/topology/data_node_ec.go
@@ -4,7 +4,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
)
-func (dn *DataNode) GetEcShards() (ret []erasure_coding.EcVolumeInfo) {
+func (dn *DataNode) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) {
dn.RLock()
for _, ecVolumeInfo := range dn.ecShards {
ret = append(ret, ecVolumeInfo)
@@ -12,3 +12,9 @@ func (dn *DataNode) GetEcShards() (ret []erasure_coding.EcVolumeInfo) {
dn.RUnlock()
return ret
}
+
+func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
+ dn.ecShardsLock.Lock()
+ dn.ecShardsLock.Unlock()
+ return
+}
diff --git a/weed/topology/topology_ec.go b/weed/topology/topology_ec.go
new file mode 100644
index 000000000..5592b9b64
--- /dev/null
+++ b/weed/topology/topology_ec.go
@@ -0,0 +1,43 @@
+package topology
+
+import (
+ "sort"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+
+func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
+ // convert into in memory struct storage.VolumeInfo
+ var shards []*erasure_coding.EcVolumeInfo
+ sort.Slice(shardInfos, func(i, j int) bool {
+ return shardInfos[i].Id < shardInfos[j].Id
+ })
+ var prevVolumeId uint32
+ var ecVolumeInfo *erasure_coding.EcVolumeInfo
+ for _, shardInfo := range shardInfos {
+ if shardInfo.Id != prevVolumeId {
+ ecVolumeInfo = erasure_coding.NewEcVolumeInfo(shardInfo.Collection, needle.VolumeId(shardInfo.Id))
+ shards = append(shards, ecVolumeInfo)
+ }
+ ecVolumeInfo.AddShardId(erasure_coding.ShardId(shardInfo.EcIndex))
+ }
+ // find out the delta volumes
+ newShards, deletedShards = dn.UpdateEcShards(shards)
+ for _, v := range newShards {
+ t.RegisterEcShards(v, dn)
+ }
+ for _, v := range deletedShards {
+ t.UnRegisterEcShards(v, dn)
+ }
+ return
+}
+
+func (t *Topology) RegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) {
+}
+func (t *Topology) UnRegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) {
+ glog.Infof("removing ec shard info:%+v", ecShardInfos)
+}