diff options
Diffstat (limited to 'weed/shell/command_ec_encode.go')
| -rw-r--r-- | weed/shell/command_ec_encode.go | 92 |
1 files changed, 45 insertions, 47 deletions
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index f07cb93f9..6efb05488 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -8,13 +8,14 @@ import ( "sync" "time" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/wdclient" - "google.golang.org/grpc" ) func init() { @@ -62,22 +63,21 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return nil } - ctx := context.Background() vid := needle.VolumeId(*volumeId) // volumeId is provided if vid != 0 { - return doEcEncode(ctx, commandEnv, *collection, vid) + return doEcEncode(commandEnv, *collection, vid) } // apply to all volumes in the collection - volumeIds, err := collectVolumeIdsForEcEncode(ctx, commandEnv, *collection, *fullPercentage, *quietPeriod) + volumeIds, err := collectVolumeIdsForEcEncode(commandEnv, *collection, *fullPercentage, *quietPeriod) if err != nil { return err } fmt.Printf("ec encode volumes: %v\n", volumeIds) for _, vid := range volumeIds { - if err = doEcEncode(ctx, commandEnv, *collection, vid); err != nil { + if err = doEcEncode(commandEnv, *collection, vid); err != nil { return err } } @@ -85,27 +85,29 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return nil } -func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) { +func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) { // find volume location locations, found := commandEnv.MasterClient.GetLocations(uint32(vid)) if !found { return fmt.Errorf("volume %d not found", vid) } + // fmt.Printf("found ec %d shards on %v\n", vid, locations) + // mark the volume as readonly - err = markVolumeReadonly(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations) + err = markVolumeReadonly(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations) if err != nil { - return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err) + return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) } // generate ec shards - err = generateEcShards(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url) + err = generateEcShards(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url) if err != nil { return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err) } // balance the ec shards to current cluster - err = spreadEcShards(ctx, commandEnv, vid, collection, locations) + err = spreadEcShards(commandEnv, vid, collection, locations) if err != nil { return fmt.Errorf("spread ec shards for volume %d from %s: %v", vid, locations[0].Url, err) } @@ -113,12 +115,12 @@ func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string, return nil } -func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location) error { +func markVolumeReadonly(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location) error { for _, location := range locations { err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, markErr := volumeServerClient.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{ + _, markErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{ VolumeId: uint32(volumeId), }) return markErr @@ -133,10 +135,10 @@ func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, vol return nil } -func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error { +func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error { err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, genErr := volumeServerClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{ + _, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{ VolumeId: uint32(volumeId), Collection: collection, }) @@ -147,9 +149,9 @@ func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volum } -func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) { +func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) { - allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv, "") + allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, "") if err != nil { return err } @@ -163,29 +165,29 @@ func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle } // calculate how many shards to allocate for these servers - allocated := balancedEcDistribution(allocatedDataNodes) + allocatedEcIds := balancedEcDistribution(allocatedDataNodes) // ask the data nodes to copy from the source volume server - copiedShardIds, err := parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allocatedDataNodes, allocated, volumeId, collection, existingLocations[0]) + copiedShardIds, err := parallelCopyEcShardsFromSource(commandEnv.option.GrpcDialOption, allocatedDataNodes, allocatedEcIds, volumeId, collection, existingLocations[0]) if err != nil { return err } // unmount the to be deleted shards - err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds) + err = unmountEcShards(commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds) if err != nil { return err } // ask the source volume server to clean up copied ec shards - err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds) + err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds) if err != nil { return fmt.Errorf("source delete copied ecShards %s %d.%v: %v", existingLocations[0].Url, volumeId, copiedShardIds, err) } // ask the source volume server to delete the original volume for _, location := range existingLocations { - err = deleteVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, location.Url) + err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, location.Url) if err != nil { return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err) } @@ -195,32 +197,28 @@ func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle } -func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption, - targetServers []*EcNode, allocated []int, - volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) { +func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServers []*EcNode, allocatedEcIds [][]uint32, volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) { // parallelize shardIdChan := make(chan []uint32, len(targetServers)) var wg sync.WaitGroup - startFromShardId := uint32(0) for i, server := range targetServers { - if allocated[i] <= 0 { + if len(allocatedEcIds[i]) <= 0 { continue } wg.Add(1) - go func(server *EcNode, startFromShardId uint32, shardCount int) { + go func(server *EcNode, allocatedEcShardIds []uint32) { defer wg.Done() - copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(ctx, grpcDialOption, server, - startFromShardId, shardCount, volumeId, collection, existingLocation.Url) + copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(grpcDialOption, server, + allocatedEcShardIds, volumeId, collection, existingLocation.Url) if copyErr != nil { err = copyErr } else { shardIdChan <- copiedShardIds server.addEcVolumeShards(volumeId, collection, copiedShardIds) } - }(server, startFromShardId, allocated[i]) - startFromShardId += uint32(allocated[i]) + }(server, allocatedEcIds[i]) } wg.Wait() close(shardIdChan) @@ -236,29 +234,29 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia return } -func balancedEcDistribution(servers []*EcNode) (allocated []int) { - allocated = make([]int, len(servers)) - allocatedCount := 0 - for allocatedCount < erasure_coding.TotalShardsCount { - for i, server := range servers { - if server.freeEcSlot-allocated[i] > 0 { - allocated[i] += 1 - allocatedCount += 1 - } - if allocatedCount >= erasure_coding.TotalShardsCount { - break - } +func balancedEcDistribution(servers []*EcNode) (allocated [][]uint32) { + allocated = make([][]uint32, len(servers)) + allocatedShardIdIndex := uint32(0) + serverIndex := 0 + for allocatedShardIdIndex < erasure_coding.TotalShardsCount { + if servers[serverIndex].freeEcSlot > 0 { + allocated[serverIndex] = append(allocated[serverIndex], allocatedShardIdIndex) + allocatedShardIdIndex++ + } + serverIndex++ + if serverIndex >= len(servers) { + serverIndex = 0 } } return allocated } -func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { +func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { var resp *master_pb.VolumeListResponse - err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) return err }) if err != nil { @@ -281,7 +279,7 @@ func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *CommandEnv, se } }) - for vid, _ := range vidMap { + for vid := range vidMap { vids = append(vids, needle.VolumeId(vid)) } |
