aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_ec_encode.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-06-03 02:26:31 -0700
committerChris Lu <chris.lu@gmail.com>2019-06-03 02:26:31 -0700
commit7e80b2b8823a9bb8bac58100a76d6a5825c94be4 (patch)
treef406744ba5e7302157de46a59b4e5c09abff067f /weed/shell/command_ec_encode.go
parent55be09996d8f82e461e1c464db82707c982b2b57 (diff)
downloadseaweedfs-7e80b2b8823a9bb8bac58100a76d6a5825c94be4.tar.xz
seaweedfs-7e80b2b8823a9bb8bac58100a76d6a5825c94be4.zip
fix multiple bugs
Diffstat (limited to 'weed/shell/command_ec_encode.go')
-rw-r--r--weed/shell/command_ec_encode.go161
1 files changed, 19 insertions, 142 deletions
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index 14d1ae96b..94265a874 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -5,11 +5,9 @@ import (
"flag"
"fmt"
"io"
- "sort"
"sync"
"time"
- "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
@@ -73,6 +71,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Wr
if err != nil {
return err
}
+ fmt.Printf("ec encode volumes: %v\n", volumeIds)
for _, vid := range volumeIds {
if err = doEcEncode(ctx, commandEnv, *collection, vid); err != nil {
return err
@@ -96,9 +95,9 @@ func doEcEncode(ctx context.Context, commandEnv *commandEnv, collection string,
}
// balance the ec shards to current cluster
- err = balanceEcShards(ctx, commandEnv, vid, collection, locations)
+ err = spreadEcShards(ctx, commandEnv, vid, collection, locations)
if err != nil {
- return fmt.Errorf("balance ec shards for volume %d on %s: %v", vid, locations[0].Url, err)
+ return fmt.Errorf("spread ec shards for volume %d to %s: %v", vid, locations[0].Url, err)
}
return nil
@@ -118,7 +117,7 @@ func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volum
}
-func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) {
+func spreadEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) {
allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv)
if err != nil {
@@ -139,13 +138,19 @@ func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needl
// ask the data nodes to copy from the source volume server
copiedShardIds, err := parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allocatedDataNodes, allocated, volumeId, collection, existingLocations[0])
if err != nil {
- return nil
+ return err
+ }
+
+ // unmount the to be deleted shards
+ err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds)
+ if err != nil {
+ return err
}
// ask the source volume server to clean up copied ec shards
- err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds)
+ err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds)
if err != nil {
- return fmt.Errorf("sourceServerDeleteEcShards %s %d.%v: %v", existingLocations[0], volumeId, copiedShardIds, err)
+ return fmt.Errorf("source delete copied ecShards %s %d.%v: %v", existingLocations[0].Url, volumeId, copiedShardIds, err)
}
// ask the source volume server to delete the original volume
@@ -176,7 +181,7 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia
wg.Add(1)
go func(server *EcNode, startFromShardId uint32, shardCount int) {
defer wg.Done()
- copiedShardIds, copyErr := oneServerCopyEcShardsFromSource(ctx, grpcDialOption, server,
+ copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(ctx, grpcDialOption, server,
startFromShardId, shardCount, volumeId, collection, existingLocation.Url)
if copyErr != nil {
err = copyErr
@@ -201,81 +206,12 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia
return
}
-func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption,
- targetServer *EcNode, startFromShardId uint32, shardCount int,
- volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) {
-
- var shardIdsToCopy []uint32
- for shardId := startFromShardId; shardId < startFromShardId+uint32(shardCount); shardId++ {
- fmt.Printf("allocate %d.%d %s => %s\n", volumeId, shardId, existingLocation, targetServer.info.Id)
- shardIdsToCopy = append(shardIdsToCopy, shardId)
- }
-
- err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
-
- if targetServer.info.Id != existingLocation {
-
- _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
- VolumeId: uint32(volumeId),
- Collection: collection,
- ShardIds: shardIdsToCopy,
- SourceDataNode: existingLocation,
- })
- if copyErr != nil {
- return copyErr
- }
- }
-
- _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
- VolumeId: uint32(volumeId),
- Collection: collection,
- ShardIds: shardIdsToCopy,
- })
- if mountErr != nil {
- return mountErr
- }
-
- if targetServer.info.Id != existingLocation {
- copiedShardIds = shardIdsToCopy
- glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds)
- }
-
- return nil
- })
-
- if err != nil {
- return
- }
-
- return
-}
-
-func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
- volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error {
-
- shouldDeleteEcx := len(toBeDeletedShardIds) == erasure_coding.TotalShardsCount
-
- return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{
- VolumeId: uint32(volumeId),
- ShardIds: toBeDeletedShardIds,
- ShouldDeleteEcx: shouldDeleteEcx,
- })
- return deleteErr
- })
-
-}
-
func balancedEcDistribution(servers []*EcNode) (allocated []int) {
- freeSlots := make([]int, len(servers))
allocated = make([]int, len(servers))
- for i, server := range servers {
- freeSlots[i] = countFreeShardSlots(server.info)
- }
allocatedCount := 0
for allocatedCount < erasure_coding.TotalShardsCount {
- for i, _ := range servers {
- if freeSlots[i]-allocated[i] > 0 {
+ for i, server := range servers {
+ if server.freeEcSlot-allocated[i] > 0 {
allocated[i] += 1
allocatedCount += 1
}
@@ -288,67 +224,6 @@ func balancedEcDistribution(servers []*EcNode) (allocated []int) {
return allocated
}
-func eachDataNode(topo *master_pb.TopologyInfo, fn func(*master_pb.DataNodeInfo)) {
- for _, dc := range topo.DataCenterInfos {
- for _, rack := range dc.RackInfos {
- for _, dn := range rack.DataNodeInfos {
- fn(dn)
- }
- }
- }
-}
-
-func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) {
- for _, ecShardInfo := range ecShardInfos {
- shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
- count += shardBits.ShardIdCount()
- }
- return
-}
-
-func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count int) {
- return int(dn.FreeVolumeCount)*10 - countShards(dn.EcShardInfos)
-}
-
-type EcNode struct {
- info *master_pb.DataNodeInfo
- freeEcSlot int
-}
-
-func sortEcNodes(ecNodes []*EcNode) {
- sort.Slice(ecNodes, func(i, j int) bool {
- return ecNodes[i].freeEcSlot > ecNodes[j].freeEcSlot
- })
-}
-
-func collectEcNodes(ctx context.Context, commandEnv *commandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
-
- // list all possible locations
- var resp *master_pb.VolumeListResponse
- err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
- return err
- })
- if err != nil {
- return nil, 0, err
- }
-
- // find out all volume servers with one slot left.
- eachDataNode(resp.TopologyInfo, func(dn *master_pb.DataNodeInfo) {
- if freeEcSlots := countFreeShardSlots(dn); freeEcSlots > 0 {
- ecNodes = append(ecNodes, &EcNode{
- info: dn,
- freeEcSlot: int(freeEcSlots),
- })
- totalFreeEcSlots += freeEcSlots
- }
- })
-
- sortEcNodes(ecNodes)
-
- return
-}
-
func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *commandEnv, selectedCollection string, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
var resp *master_pb.VolumeListResponse
@@ -360,9 +235,11 @@ func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *commandEnv, se
return
}
- quietSeconds := int64((quietPeriod * time.Second).Seconds())
+ quietSeconds := int64(quietPeriod / time.Second)
nowUnixSeconds := time.Now().Unix()
+ fmt.Printf("ec encode volumes quiet for: %d seconds\n", quietSeconds)
+
vidMap := make(map[uint32]bool)
for _, dc := range resp.TopologyInfo.DataCenterInfos {
for _, r := range dc.RackInfos {