diff options
| author | Chris Lu <chris.lu@gmail.com> | 2019-06-03 02:26:31 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2019-06-03 02:26:31 -0700 |
| commit | 7e80b2b8823a9bb8bac58100a76d6a5825c94be4 (patch) | |
| tree | f406744ba5e7302157de46a59b4e5c09abff067f /weed/shell | |
| parent | 55be09996d8f82e461e1c464db82707c982b2b57 (diff) | |
| download | seaweedfs-7e80b2b8823a9bb8bac58100a76d6a5825c94be4.tar.xz seaweedfs-7e80b2b8823a9bb8bac58100a76d6a5825c94be4.zip | |
fix multiple bugs
Diffstat (limited to 'weed/shell')
| -rw-r--r-- | weed/shell/command_ec_balance.go | 25 | ||||
| -rw-r--r-- | weed/shell/command_ec_common.go | 198 | ||||
| -rw-r--r-- | weed/shell/command_ec_encode.go | 161 | ||||
| -rw-r--r-- | weed/shell/command_ec_rebuild.go | 272 | ||||
| -rw-r--r-- | weed/shell/command_volume_balance.go | 4 |
5 files changed, 494 insertions, 166 deletions
diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index 7118f2195..0934c79fd 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -56,9 +56,9 @@ func (c *commandEcBalance) Help() string { func (c *commandEcBalance) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) - collection := balanceCommand.String("c", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection") + collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection") dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter") - applyBalancing := balanceCommand.Bool("f", false, "apply the balancing plan") + applyBalancing := balanceCommand.Bool("force", false, "apply the balancing plan") if err = balanceCommand.Parse(args); err != nil { return nil } @@ -211,7 +211,7 @@ func pickOneEcNodeAndMoveOneShard(ctx context.Context, commandEnv *commandEnv, a fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destEcNode.info.Id) - err := moveOneShardToEcNode(ctx, commandEnv, existingLocation, collection, vid, shardId, destEcNode, applyBalancing) + err := moveMountedShardToEcNode(ctx, commandEnv, existingLocation, collection, vid, shardId, destEcNode, applyBalancing) if err != nil { return err } @@ -223,25 +223,6 @@ func pickOneEcNodeAndMoveOneShard(ctx context.Context, commandEnv *commandEnv, a return nil } -func moveOneShardToEcNode(ctx context.Context, commandEnv *commandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) error { - - fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id) - - if !applyBalancing { - return nil - } - - // ask destination node to copy shard and the ecx file from source node, and mount it - copiedShardIds, err := oneServerCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, destinationEcNode, uint32(shardId), 1, vid, collection, existingLocation.info.Id) - if err != nil { - return err - } - - // ask source node to delete the shard, and maybe the ecx file - return sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds) - -} - func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits { for _, shardInfo := range ecNode.info.EcShardInfos { diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go new file mode 100644 index 000000000..0cbf694cd --- /dev/null +++ b/weed/shell/command_ec_common.go @@ -0,0 +1,198 @@ +package shell + +import ( + "context" + "fmt" + "sort" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" +) + +func moveMountedShardToEcNode(ctx context.Context, commandEnv *commandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) error { + + fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id) + + if !applyBalancing { + return nil + } + + // ask destination node to copy shard and the ecx file from source node, and mount it + copiedShardIds, err := oneServerCopyAndMountEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, destinationEcNode, uint32(shardId), 1, vid, collection, existingLocation.info.Id) + if err != nil { + return err + } + + // unmount the to be deleted shards + err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds) + if err != nil { + return err + } + + // ask source node to delete the shard, and maybe the ecx file + return sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds) + +} + +func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption, + targetServer *EcNode, startFromShardId uint32, shardCount int, + volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) { + + var shardIdsToCopy []uint32 + for shardId := startFromShardId; shardId < startFromShardId+uint32(shardCount); shardId++ { + shardIdsToCopy = append(shardIdsToCopy, shardId) + } + fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id) + + err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + + if targetServer.info.Id != existingLocation { + + fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id) + _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + ShardIds: shardIdsToCopy, + CopyEcxFile: true, + SourceDataNode: existingLocation, + }) + if copyErr != nil { + return copyErr + } + } + + fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id) + _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + ShardIds: shardIdsToCopy, + }) + if mountErr != nil { + return mountErr + } + + if targetServer.info.Id != existingLocation { + copiedShardIds = shardIdsToCopy + glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds) + } + + return nil + }) + + if err != nil { + return + } + + return +} + +func eachDataNode(topo *master_pb.TopologyInfo, fn func(*master_pb.DataNodeInfo)) { + for _, dc := range topo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, dn := range rack.DataNodeInfos { + fn(dn) + } + } + } +} + +func sortEcNodes(ecNodes []*EcNode) { + sort.Slice(ecNodes, func(i, j int) bool { + return ecNodes[i].freeEcSlot > ecNodes[j].freeEcSlot + }) +} + +func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) { + for _, ecShardInfo := range ecShardInfos { + shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits) + count += shardBits.ShardIdCount() + } + return +} + +func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count int) { + return int(dn.FreeVolumeCount)*10 - countShards(dn.EcShardInfos) +} + +type EcNode struct { + info *master_pb.DataNodeInfo + freeEcSlot int +} + +func collectEcNodes(ctx context.Context, commandEnv *commandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { + + // list all possible locations + var resp *master_pb.VolumeListResponse + err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) + return err + }) + if err != nil { + return nil, 0, err + } + + // find out all volume servers with one slot left. + eachDataNode(resp.TopologyInfo, func(dn *master_pb.DataNodeInfo) { + if freeEcSlots := countFreeShardSlots(dn); freeEcSlots > 0 { + ecNodes = append(ecNodes, &EcNode{ + info: dn, + freeEcSlot: int(freeEcSlots), + }) + totalFreeEcSlots += freeEcSlots + } + }) + + sortEcNodes(ecNodes) + + return +} + +func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption, + collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error { + + fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation) + + return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + ShardIds: toBeDeletedShardIds, + }) + return deleteErr + }) + +} + +func unmountEcShards(ctx context.Context, grpcDialOption grpc.DialOption, + volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error { + + fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation) + + return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(ctx, &volume_server_pb.VolumeEcShardsUnmountRequest{ + VolumeId: uint32(volumeId), + ShardIds: toBeUnmountedhardIds, + }) + return deleteErr + }) +} + +func mountEcShards(ctx context.Context, grpcDialOption grpc.DialOption, + collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error { + + fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation) + + return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + ShardIds: toBeMountedhardIds, + }) + return mountErr + }) +} diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 14d1ae96b..94265a874 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -5,11 +5,9 @@ import ( "flag" "fmt" "io" - "sort" "sync" "time" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" @@ -73,6 +71,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Wr if err != nil { return err } + fmt.Printf("ec encode volumes: %v\n", volumeIds) for _, vid := range volumeIds { if err = doEcEncode(ctx, commandEnv, *collection, vid); err != nil { return err @@ -96,9 +95,9 @@ func doEcEncode(ctx context.Context, commandEnv *commandEnv, collection string, } // balance the ec shards to current cluster - err = balanceEcShards(ctx, commandEnv, vid, collection, locations) + err = spreadEcShards(ctx, commandEnv, vid, collection, locations) if err != nil { - return fmt.Errorf("balance ec shards for volume %d on %s: %v", vid, locations[0].Url, err) + return fmt.Errorf("spread ec shards for volume %d to %s: %v", vid, locations[0].Url, err) } return nil @@ -118,7 +117,7 @@ func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volum } -func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) { +func spreadEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) { allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv) if err != nil { @@ -139,13 +138,19 @@ func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needl // ask the data nodes to copy from the source volume server copiedShardIds, err := parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allocatedDataNodes, allocated, volumeId, collection, existingLocations[0]) if err != nil { - return nil + return err + } + + // unmount the to be deleted shards + err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds) + if err != nil { + return err } // ask the source volume server to clean up copied ec shards - err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds) + err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds) if err != nil { - return fmt.Errorf("sourceServerDeleteEcShards %s %d.%v: %v", existingLocations[0], volumeId, copiedShardIds, err) + return fmt.Errorf("source delete copied ecShards %s %d.%v: %v", existingLocations[0].Url, volumeId, copiedShardIds, err) } // ask the source volume server to delete the original volume @@ -176,7 +181,7 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia wg.Add(1) go func(server *EcNode, startFromShardId uint32, shardCount int) { defer wg.Done() - copiedShardIds, copyErr := oneServerCopyEcShardsFromSource(ctx, grpcDialOption, server, + copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(ctx, grpcDialOption, server, startFromShardId, shardCount, volumeId, collection, existingLocation.Url) if copyErr != nil { err = copyErr @@ -201,81 +206,12 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia return } -func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption, - targetServer *EcNode, startFromShardId uint32, shardCount int, - volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) { - - var shardIdsToCopy []uint32 - for shardId := startFromShardId; shardId < startFromShardId+uint32(shardCount); shardId++ { - fmt.Printf("allocate %d.%d %s => %s\n", volumeId, shardId, existingLocation, targetServer.info.Id) - shardIdsToCopy = append(shardIdsToCopy, shardId) - } - - err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - - if targetServer.info.Id != existingLocation { - - _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ - VolumeId: uint32(volumeId), - Collection: collection, - ShardIds: shardIdsToCopy, - SourceDataNode: existingLocation, - }) - if copyErr != nil { - return copyErr - } - } - - _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ - VolumeId: uint32(volumeId), - Collection: collection, - ShardIds: shardIdsToCopy, - }) - if mountErr != nil { - return mountErr - } - - if targetServer.info.Id != existingLocation { - copiedShardIds = shardIdsToCopy - glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds) - } - - return nil - }) - - if err != nil { - return - } - - return -} - -func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption, - volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error { - - shouldDeleteEcx := len(toBeDeletedShardIds) == erasure_coding.TotalShardsCount - - return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{ - VolumeId: uint32(volumeId), - ShardIds: toBeDeletedShardIds, - ShouldDeleteEcx: shouldDeleteEcx, - }) - return deleteErr - }) - -} - func balancedEcDistribution(servers []*EcNode) (allocated []int) { - freeSlots := make([]int, len(servers)) allocated = make([]int, len(servers)) - for i, server := range servers { - freeSlots[i] = countFreeShardSlots(server.info) - } allocatedCount := 0 for allocatedCount < erasure_coding.TotalShardsCount { - for i, _ := range servers { - if freeSlots[i]-allocated[i] > 0 { + for i, server := range servers { + if server.freeEcSlot-allocated[i] > 0 { allocated[i] += 1 allocatedCount += 1 } @@ -288,67 +224,6 @@ func balancedEcDistribution(servers []*EcNode) (allocated []int) { return allocated } -func eachDataNode(topo *master_pb.TopologyInfo, fn func(*master_pb.DataNodeInfo)) { - for _, dc := range topo.DataCenterInfos { - for _, rack := range dc.RackInfos { - for _, dn := range rack.DataNodeInfos { - fn(dn) - } - } - } -} - -func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) { - for _, ecShardInfo := range ecShardInfos { - shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits) - count += shardBits.ShardIdCount() - } - return -} - -func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count int) { - return int(dn.FreeVolumeCount)*10 - countShards(dn.EcShardInfos) -} - -type EcNode struct { - info *master_pb.DataNodeInfo - freeEcSlot int -} - -func sortEcNodes(ecNodes []*EcNode) { - sort.Slice(ecNodes, func(i, j int) bool { - return ecNodes[i].freeEcSlot > ecNodes[j].freeEcSlot - }) -} - -func collectEcNodes(ctx context.Context, commandEnv *commandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { - - // list all possible locations - var resp *master_pb.VolumeListResponse - err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) - return err - }) - if err != nil { - return nil, 0, err - } - - // find out all volume servers with one slot left. - eachDataNode(resp.TopologyInfo, func(dn *master_pb.DataNodeInfo) { - if freeEcSlots := countFreeShardSlots(dn); freeEcSlots > 0 { - ecNodes = append(ecNodes, &EcNode{ - info: dn, - freeEcSlot: int(freeEcSlots), - }) - totalFreeEcSlots += freeEcSlots - } - }) - - sortEcNodes(ecNodes) - - return -} - func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *commandEnv, selectedCollection string, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { var resp *master_pb.VolumeListResponse @@ -360,9 +235,11 @@ func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *commandEnv, se return } - quietSeconds := int64((quietPeriod * time.Second).Seconds()) + quietSeconds := int64(quietPeriod / time.Second) nowUnixSeconds := time.Now().Unix() + fmt.Printf("ec encode volumes quiet for: %d seconds\n", quietSeconds) + vidMap := make(map[uint32]bool) for _, dc := range resp.TopologyInfo.DataCenterInfos { for _, r := range dc.RackInfos { diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go new file mode 100644 index 000000000..479b51484 --- /dev/null +++ b/weed/shell/command_ec_rebuild.go @@ -0,0 +1,272 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" +) + +func init() { + commands = append(commands, &commandEcRebuild{}) +} + +type commandEcRebuild struct { +} + +func (c *commandEcRebuild) Name() string { + return "ec.rebuild" +} + +func (c *commandEcRebuild) Help() string { + return `find and rebuild missing ec shards among volume servers + + ec.rebuild [-c EACH_COLLECTION|<collection_name>] [-f] + + Algorithm: + + For each type of volume server (different max volume count limit){ + for each collection { + rebuildEcVolumes() + } + } + + func rebuildEcVolumes(){ + idealWritableVolumes = totalWritableVolumes / numVolumeServers + for { + sort all volume servers ordered by the number of local writable volumes + pick the volume server A with the lowest number of writable volumes x + pick the volume server B with the highest number of writable volumes y + if y > idealWritableVolumes and x +1 <= idealWritableVolumes { + if B has a writable volume id v that A does not have { + move writable volume v from A to B + } + } + } + } + +` +} + +func (c *commandEcRebuild) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { + + fixCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + collection := fixCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection") + applyChanges := fixCommand.Bool("force", false, "apply the changes") + if err = fixCommand.Parse(args); err != nil { + return nil + } + + // collect all ec nodes + allEcNodes, _, err := collectEcNodes(context.Background(), commandEnv) + if err != nil { + return err + } + + if *collection == "EACH_COLLECTION" { + collections, err := ListCollectionNames(commandEnv, false, true) + if err != nil { + return err + } + fmt.Printf("rebuildEcVolumes collections %+v\n", len(collections)) + for _, c := range collections { + fmt.Printf("rebuildEcVolumes collection %+v\n", c) + if err = rebuildEcVolumes(commandEnv, allEcNodes, c, writer, *applyChanges); err != nil { + return err + } + } + } else { + if err = rebuildEcVolumes(commandEnv, allEcNodes, *collection, writer, *applyChanges); err != nil { + return err + } + } + + return nil +} + +func rebuildEcVolumes(commandEnv *commandEnv, allEcNodes []*EcNode, collection string, writer io.Writer, applyChanges bool) error { + + ctx := context.Background() + + fmt.Printf("rebuildEcVolumes %s\n", collection) + + // collect vid => each shard locations, similar to ecShardMap in topology.go + ecShardMap := make(EcShardMap) + for _, ecNode := range allEcNodes { + ecShardMap.registerEcNode(ecNode, collection) + } + + for vid, locations := range ecShardMap { + shardCount := locations.shardCount() + if shardCount == erasure_coding.TotalShardsCount { + continue + } + if shardCount < erasure_coding.DataShardsCount { + return fmt.Errorf("ec volume %d is unrepairable with %d shards\n", vid, shardCount) + } + + sortEcNodes(allEcNodes) + + if allEcNodes[0].freeEcSlot < erasure_coding.TotalShardsCount { + return fmt.Errorf("disk space is not enough") + } + + if err := rebuildOneEcVolume(ctx, commandEnv, allEcNodes[0], collection, vid, locations, writer, applyChanges); err != nil { + return err + } + } + + return nil +} + +func rebuildOneEcVolume(ctx context.Context, commandEnv *commandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyChanges bool) error { + + fmt.Printf("rebuildOneEcVolume %s %d\n", collection, volumeId) + + // collect shard files to rebuilder local disk + var generatedShardIds []uint32 + copiedShardIds, _, err := prepareDataToRecover(ctx, commandEnv, rebuilder, collection, volumeId, locations, writer, applyChanges) + if err != nil { + return err + } + defer func() { + // clean up working files + + // ask the rebuilder to delete the copied shards + err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, copiedShardIds) + if err != nil { + fmt.Fprintf(writer, "%s delete copied ec shards %s %d.%v\n", rebuilder.info.Id, collection, volumeId, copiedShardIds) + } + + // ask the rebuilder to delete the copied shards + err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, generatedShardIds) + if err != nil { + fmt.Fprintf(writer, "%s delete generated ec shards %s %d.%v\n", rebuilder.info.Id, collection, volumeId, generatedShardIds) + } + + }() + + if !applyChanges { + return nil + } + + // generate ec shards, and maybe ecx file, and mount them + generatedShardIds, err = generateMissingShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id) + if err != nil { + return err + } + + // mount the generated shards + err = mountEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, generatedShardIds) + if err != nil { + return err + } + + return nil +} + +func generateMissingShards(ctx context.Context, grpcDialOption grpc.DialOption, + collection string, volumeId needle.VolumeId, sourceLocation string) (rebuiltShardIds []uint32, err error) { + + err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(ctx, &volume_server_pb.VolumeEcShardsRebuildRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + }) + if rebultErr == nil { + rebuiltShardIds = resp.RebuiltShardIds + } + return rebultErr + }) + return +} + +func prepareDataToRecover(ctx context.Context, commandEnv *commandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyBalancing bool) (copiedShardIds []uint32, localShardIds []uint32, err error) { + + needEcxFile := true + var localShardBits erasure_coding.ShardBits + for _, ecShardInfo := range rebuilder.info.EcShardInfos { + if ecShardInfo.Collection == collection && needle.VolumeId(ecShardInfo.Id) == volumeId { + needEcxFile = false + localShardBits = erasure_coding.ShardBits(ecShardInfo.EcIndexBits) + } + } + + for shardId, ecNodes := range locations { + + if len(ecNodes) == 0 { + fmt.Fprintf(writer, "missing shard %d.%d\n", volumeId, shardId) + continue + } + + if localShardBits.HasShardId(erasure_coding.ShardId(shardId)){ + localShardIds = append(localShardIds, uint32(shardId)) + fmt.Fprintf(writer, "use existing shard %d.%d\n", volumeId, shardId) + continue + } + + var copyErr error + if applyBalancing{ + copyErr = operation.WithVolumeServerClient(rebuilder.info.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + ShardIds: []uint32{uint32(shardId)}, + CopyEcxFile: needEcxFile, + SourceDataNode: ecNodes[0].info.Id, + }) + return copyErr + }) + if copyErr == nil && needEcxFile { + needEcxFile = false + } + } + if copyErr != nil { + fmt.Fprintf(writer, "%s failed to copy %d.%d from %s: %v\n", rebuilder.info.Id, volumeId, shardId, ecNodes[0].info.Id, copyErr) + } else { + fmt.Fprintf(writer, "%s copied %d.%d from %s\n", rebuilder.info.Id, volumeId, shardId, ecNodes[0].info.Id) + copiedShardIds = append(copiedShardIds, uint32(shardId)) + } + + } + + if len(copiedShardIds)+len(localShardIds) >= erasure_coding.DataShardsCount { + return copiedShardIds, localShardIds, nil + } + + return nil, nil, fmt.Errorf("%d shards are not enough to recover volume %d", len(copiedShardIds)+len(localShardIds), volumeId) + +} + +type EcShardMap map[needle.VolumeId]EcShardLocations +type EcShardLocations [][]*EcNode + +func (ecShardMap EcShardMap) registerEcNode(ecNode *EcNode, collection string) { + for _, shardInfo := range ecNode.info.EcShardInfos { + if shardInfo.Collection == collection { + existing, found := ecShardMap[needle.VolumeId(shardInfo.Id)] + if !found { + existing = make([][]*EcNode, erasure_coding.TotalShardsCount) + ecShardMap[needle.VolumeId(shardInfo.Id)] = existing + } + for _, shardId := range erasure_coding.ShardBits(shardInfo.EcIndexBits).ShardIds() { + existing[shardId] = append(existing[shardId], ecNode) + } + } + } +} + +func (ecShardLocations EcShardLocations) shardCount() (count int) { + for _, locations := range ecShardLocations { + if len(locations) > 0 { + count++ + } + } + return +} diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index 2a5731333..9108fccaa 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -62,9 +62,9 @@ func (c *commandVolumeBalance) Help() string { func (c *commandVolumeBalance) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) - collection := balanceCommand.String("c", "EACH_COLLECTION", "collection name, or use \"ALL_COLLECTIONS\" across collections, \"EACH_COLLECTION\" for each collection") + collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or use \"ALL_COLLECTIONS\" across collections, \"EACH_COLLECTION\" for each collection") dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter") - applyBalancing := balanceCommand.Bool("f", false, "apply the balancing plan.") + applyBalancing := balanceCommand.Bool("force", false, "apply the balancing plan.") if err = balanceCommand.Parse(args); err != nil { return nil } |
