From 41e8ae61f80275a2d1ba49f36553f798cf8efe3a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 25 May 2019 14:02:06 -0700 Subject: generate, balance, delete copied shards, delete old volume --- weed/shell/command_ec_encode.go | 109 ++++++++++++++++++++++++++++++---------- 1 file changed, 83 insertions(+), 26 deletions(-) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 4647c2507..c9ec2fcd2 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -51,7 +51,8 @@ func (c *commandEcEncode) Help() string { func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) - volumeId := encodeCommand.Int("vid", 0, "the volume id") + volumeId := encodeCommand.Int("volumeId", 0, "the volume id") + collection := encodeCommand.String("collection", "", "the collection name") if err = encodeCommand.Parse(args); err != nil { return nil } @@ -65,13 +66,13 @@ func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Wr } // generate ec shards - err = generateEcShards(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(*volumeId), locations[0].Url) + err = generateEcShards(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(*volumeId), *collection, locations[0].Url) if err != nil { return fmt.Errorf("generate ec shards for volume %d on %s: %v", *volumeId, locations[0].Url, err) } // balance the ec shards to current cluster - err = balanceEcShards(ctx, commandEnv, needle.VolumeId(*volumeId), locations[0]) + err = balanceEcShards(ctx, commandEnv, needle.VolumeId(*volumeId), *collection, locations) if err != nil { return fmt.Errorf("balance ec shards for volume %d on %s: %v", *volumeId, locations[0].Url, err) } @@ -79,11 +80,12 @@ func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Wr return err } -func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) error { +func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error { err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, genErr := volumeServerClient.VolumeEcGenerateSlices(ctx, &volume_server_pb.VolumeEcGenerateSlicesRequest{ - VolumeId: uint32(volumeId), + _, genErr := volumeServerClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{ + VolumeId: uint32(volumeId), + Collection: collection, }) return genErr }) @@ -92,7 +94,7 @@ func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volum } -func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needle.VolumeId, existingLocation wdclient.Location) (err error) { +func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) { // list all possible locations var resp *master_pb.VolumeListResponse @@ -106,7 +108,7 @@ func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needl // find out all volume servers with one volume slot left. var allDataNodes []*master_pb.DataNodeInfo - var totalFreeEcSlots int + var totalFreeEcSlots uint32 eachDataNode(resp.TopologyInfo, func(dn *master_pb.DataNodeInfo) { if freeEcSlots := countFreeShardSlots(dn); freeEcSlots > 0 { allDataNodes = append(allDataNodes, dn) @@ -127,63 +129,118 @@ func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needl allocated := balancedEcDistribution(allDataNodes) // ask the data nodes to copy from the source volume server - err = parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allDataNodes, allocated, volumeId, existingLocation) + copiedShardIds, err := parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allDataNodes, allocated, volumeId, collection, existingLocations[0]) if err != nil { return nil } // ask the source volume server to clean up copied ec shards + err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0], copiedShardIds) + if err != nil { + return fmt.Errorf("sourceServerDeleteEcShards %s %d.%v: %v", existingLocations[0], volumeId, copiedShardIds, err) + } // ask the source volume server to delete the original volume + for _, location := range existingLocations { + err = deleteVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, location.Url) + if err != nil { + return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err) + } + } return err } func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption, - targetServers []*master_pb.DataNodeInfo, allocated []int, - volumeId needle.VolumeId, existingLocation wdclient.Location) (err error) { + targetServers []*master_pb.DataNodeInfo, allocated []uint32, + volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) { // parallelize + shardIdChan := make(chan []uint32, len(targetServers)) var wg sync.WaitGroup - startFromShardId := 0 + startFromShardId := uint32(0) for i, server := range targetServers { if allocated[i] <= 0 { continue } wg.Add(1) - go func(server *master_pb.DataNodeInfo, startFromShardId int, shardCount int) { + go func(server *master_pb.DataNodeInfo, startFromShardId uint32, shardCount uint32) { defer wg.Done() - copyErr := oneServerCopyEcShardsFromSource(ctx, grpcDialOption, server, startFromShardId, shardCount, volumeId, existingLocation) + copiedShardIds, copyErr := oneServerCopyEcShardsFromSource(ctx, grpcDialOption, server, + startFromShardId, shardCount, volumeId, collection, existingLocation) if copyErr != nil { err = copyErr + } else { + shardIdChan <- copiedShardIds } }(server, startFromShardId, allocated[i]) startFromShardId += allocated[i] } wg.Wait() + close(shardIdChan) - return err + if err != nil { + return nil, err + } + + for shardIds := range shardIdChan { + actuallyCopied = append(actuallyCopied, shardIds...) + } + + return } func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption, - targetServer *master_pb.DataNodeInfo, startFromShardId int, shardCount int, - volumeId needle.VolumeId, existingLocation wdclient.Location) (err error) { + targetServer *master_pb.DataNodeInfo, startFromShardId uint32, shardCount uint32, + volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (copiedShardIds []uint32, err error) { if targetServer.Id == existingLocation.Url { - return nil + return nil, nil } for shardId := startFromShardId; shardId < startFromShardId+shardCount; shardId++ { fmt.Printf("copy %d.%d %s => %s\n", volumeId, shardId, existingLocation.Url, targetServer.Id) + copiedShardIds = append(copiedShardIds, shardId) } - return nil + err = operation.WithVolumeServerClient(targetServer.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + EcIndexes: copiedShardIds, + SourceDataNode: existingLocation.Url, + }) + return copyErr + }) + + if err != nil { + return + } + + return } -func balancedEcDistribution(servers []*master_pb.DataNodeInfo) (allocated []int) { - freeSlots := make([]int, len(servers)) - allocated = make([]int, len(servers)) + +func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption, + volumeId needle.VolumeId, sourceLocation wdclient.Location, toBeDeletedShardIds []uint32) error { + + shouldDeleteEcx := len(toBeDeletedShardIds) == erasure_coding.TotalShardsCount + + return operation.WithVolumeServerClient(sourceLocation.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{ + VolumeId: uint32(volumeId), + EcIndexes: toBeDeletedShardIds, + ShouldDeleteEcx: shouldDeleteEcx, + }) + return deleteErr + }) + +} + +func balancedEcDistribution(servers []*master_pb.DataNodeInfo) (allocated []uint32) { + freeSlots := make([]uint32, len(servers)) + allocated = make([]uint32, len(servers)) for i, server := range servers { freeSlots[i] = countFreeShardSlots(server) } @@ -213,14 +270,14 @@ func eachDataNode(topo *master_pb.TopologyInfo, fn func(*master_pb.DataNodeInfo) } } -func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) { +func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count uint32) { for _, ecShardInfo := range ecShardInfos { shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits) - count += shardBits.ShardIdCount() + count += uint32(shardBits.ShardIdCount()) } return } -func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count int) { - return int(dn.FreeVolumeCount)*10 - countShards(dn.EcShardInfos) +func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count uint32) { + return uint32(dn.FreeVolumeCount)*10 - countShards(dn.EcShardInfos) } -- cgit v1.2.3