aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_ec_encode.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell/command_ec_encode.go')
-rw-r--r--weed/shell/command_ec_encode.go75
1 files changed, 41 insertions, 34 deletions
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index 58527abf2..634cb11e2 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -54,6 +54,10 @@ func (c *commandEcEncode) Help() string {
func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
collection := encodeCommand.String("collection", "", "the collection name")
@@ -63,22 +67,21 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return nil
}
- ctx := context.Background()
vid := needle.VolumeId(*volumeId)
// volumeId is provided
if vid != 0 {
- return doEcEncode(ctx, commandEnv, *collection, vid)
+ return doEcEncode(commandEnv, *collection, vid)
}
// apply to all volumes in the collection
- volumeIds, err := collectVolumeIdsForEcEncode(ctx, commandEnv, *collection, *fullPercentage, *quietPeriod)
+ volumeIds, err := collectVolumeIdsForEcEncode(commandEnv, *collection, *fullPercentage, *quietPeriod)
if err != nil {
return err
}
fmt.Printf("ec encode volumes: %v\n", volumeIds)
for _, vid := range volumeIds {
- if err = doEcEncode(ctx, commandEnv, *collection, vid); err != nil {
+ if err = doEcEncode(commandEnv, *collection, vid); err != nil {
return err
}
}
@@ -86,7 +89,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return nil
}
-func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) {
+func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) {
// find volume location
locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
if !found {
@@ -96,19 +99,19 @@ func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string,
// fmt.Printf("found ec %d shards on %v\n", vid, locations)
// mark the volume as readonly
- err = markVolumeReadonly(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations)
+ err = markVolumeReadonly(commandEnv.option.GrpcDialOption, vid, locations)
if err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
}
// generate ec shards
- err = generateEcShards(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url)
+ err = generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, locations[0].Url)
if err != nil {
return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err)
}
// balance the ec shards to current cluster
- err = spreadEcShards(ctx, commandEnv, vid, collection, locations)
+ err = spreadEcShards(commandEnv, vid, collection, locations)
if err != nil {
return fmt.Errorf("spread ec shards for volume %d from %s: %v", vid, locations[0].Url, err)
}
@@ -116,12 +119,14 @@ func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string,
return nil
}
-func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location) error {
+func markVolumeReadonly(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location) error {
for _, location := range locations {
+ fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url)
+
err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, markErr := volumeServerClient.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{
+ _, markErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
VolumeId: uint32(volumeId),
})
return markErr
@@ -136,10 +141,12 @@ func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, vol
return nil
}
-func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error {
+func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error {
+
+ fmt.Printf("generateEcShards %s %d on %s ...\n", collection, volumeId, sourceVolumeServer)
err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, genErr := volumeServerClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{
+ _, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{
VolumeId: uint32(volumeId),
Collection: collection,
})
@@ -150,9 +157,9 @@ func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volum
}
-func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) {
+func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) {
- allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv, "")
+ allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, "")
if err != nil {
return err
}
@@ -169,26 +176,27 @@ func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle
allocatedEcIds := balancedEcDistribution(allocatedDataNodes)
// ask the data nodes to copy from the source volume server
- copiedShardIds, err := parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allocatedDataNodes, allocatedEcIds, volumeId, collection, existingLocations[0])
+ copiedShardIds, err := parallelCopyEcShardsFromSource(commandEnv.option.GrpcDialOption, allocatedDataNodes, allocatedEcIds, volumeId, collection, existingLocations[0])
if err != nil {
return err
}
// unmount the to be deleted shards
- err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds)
+ err = unmountEcShards(commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds)
if err != nil {
return err
}
// ask the source volume server to clean up copied ec shards
- err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds)
+ err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds)
if err != nil {
return fmt.Errorf("source delete copied ecShards %s %d.%v: %v", existingLocations[0].Url, volumeId, copiedShardIds, err)
}
// ask the source volume server to delete the original volume
for _, location := range existingLocations {
- err = deleteVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, location.Url)
+ fmt.Printf("delete volume %d from %s\n", volumeId, location.Url)
+ err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, location.Url)
if err != nil {
return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err)
}
@@ -198,9 +206,9 @@ func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle
}
-func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption,
- targetServers []*EcNode, allocatedEcIds [][]uint32,
- volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) {
+func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServers []*EcNode, allocatedEcIds [][]uint32, volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) {
+
+ fmt.Printf("parallelCopyEcShardsFromSource %d %s\n", volumeId, existingLocation.Url)
// parallelize
shardIdChan := make(chan []uint32, len(targetServers))
@@ -213,7 +221,7 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia
wg.Add(1)
go func(server *EcNode, allocatedEcShardIds []uint32) {
defer wg.Done()
- copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(ctx, grpcDialOption, server,
+ copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(grpcDialOption, server,
allocatedEcShardIds, volumeId, collection, existingLocation.Url)
if copyErr != nil {
err = copyErr
@@ -255,13 +263,10 @@ func balancedEcDistribution(servers []*EcNode) (allocated [][]uint32) {
return allocated
}
-func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
+func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
- var resp *master_pb.VolumeListResponse
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
- return err
- })
+ // collect topology information
+ topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv)
if err != nil {
return
}
@@ -269,14 +274,16 @@ func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *CommandEnv, se
quietSeconds := int64(quietPeriod / time.Second)
nowUnixSeconds := time.Now().Unix()
- fmt.Printf("ec encode volumes quiet for: %d seconds\n", quietSeconds)
+ fmt.Printf("collect volumes quiet for: %d seconds\n", quietSeconds)
vidMap := make(map[uint32]bool)
- eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
- for _, v := range dn.VolumeInfos {
- if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds {
- if float64(v.Size) > fullPercentage/100*float64(resp.VolumeSizeLimitMb)*1024*1024 {
- vidMap[v.Id] = true
+ eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
+ for _, diskInfo := range dn.DiskInfos {
+ for _, v := range diskInfo.VolumeInfos {
+ if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds {
+ if float64(v.Size) > fullPercentage/100*float64(volumeSizeLimitMb)*1024*1024 {
+ vidMap[v.Id] = true
+ }
}
}
}