diff options
| author | Lisandro Pin <lisandro.pin@proton.ch> | 2025-11-11 07:43:43 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-11-10 22:43:43 -0800 |
| commit | 79fa87bad4d6455631356a5095f5b0e9fc06eef6 (patch) | |
| tree | 4d37c29fc55a5786c7b025d1c9067675e9208817 /weed/shell/command_ec_rebuild.go | |
| parent | bf8e4f40e60e74ce03c2f497c6245e5d1460f1d3 (diff) | |
| download | seaweedfs-79fa87bad4d6455631356a5095f5b0e9fc06eef6.tar.xz seaweedfs-79fa87bad4d6455631356a5095f5b0e9fc06eef6.zip | |
Rework parameters passing for functions within `ec.rebuild` (#7445)
* Rework parameters passing for functions within `ec.rebuild`
This simplifies the overall codebase and allows to cleanly handle parallelization via waitgroups.
* fix copy source
* add tests
* remove tests not useful
* fmt
* nil check
---------
Co-authored-by: Chris Lu <chrislusf@users.noreply.github.com>
Co-authored-by: chrislu <chris.lu@gmail.com>
Diffstat (limited to 'weed/shell/command_ec_rebuild.go')
| -rw-r--r-- | weed/shell/command_ec_rebuild.go | 102 |
1 files changed, 71 insertions, 31 deletions
diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index bef56d191..3b2c63cc7 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -11,13 +11,21 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/needle" - "google.golang.org/grpc" ) func init() { Commands = append(Commands, &commandEcRebuild{}) } +type ecRebuilder struct { + // TODO: add ErrorWaitGroup for parallelization + commandEnv *CommandEnv + ecNodes []*EcNode + writer io.Writer + applyChanges bool + collections []string +} + type commandEcRebuild struct { } @@ -93,10 +101,18 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W collections = []string{*collection} } + erb := &ecRebuilder{ + commandEnv: commandEnv, + ecNodes: allEcNodes, + writer: writer, + applyChanges: *applyChanges, + collections: collections, + } + fmt.Printf("rebuildEcVolumes for %d collection(s)\n", len(collections)) for _, c := range collections { fmt.Printf("rebuildEcVolumes collection %s\n", c) - if err = rebuildEcVolumes(commandEnv, allEcNodes, c, writer, *applyChanges); err != nil { + if err = erb.rebuildEcVolumes(c); err != nil { return err } } @@ -104,13 +120,36 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W return nil } -func rebuildEcVolumes(commandEnv *CommandEnv, allEcNodes []*EcNode, collection string, writer io.Writer, applyChanges bool) error { +func (erb *ecRebuilder) write(format string, a ...any) { + fmt.Fprintf(erb.writer, format, a...) +} + +func (erb *ecRebuilder) isLocked() bool { + return erb.commandEnv.isLocked() +} + +// ecNodeWithMoreFreeSlots returns the EC node with higher free slot count, from all nodes visible to the rebuilder. +func (erb *ecRebuilder) ecNodeWithMoreFreeSlots() *EcNode { + if len(erb.ecNodes) == 0 { + return nil + } + res := erb.ecNodes[0] + for i := 1; i < len(erb.ecNodes); i++ { + if erb.ecNodes[i].freeEcSlot > res.freeEcSlot { + res = erb.ecNodes[i] + } + } + + return res +} + +func (erb *ecRebuilder) rebuildEcVolumes(collection string) error { fmt.Printf("rebuildEcVolumes %s\n", collection) // collect vid => each shard locations, similar to ecShardMap in topology.go ecShardMap := make(EcShardMap) - for _, ecNode := range allEcNodes { + for _, ecNode := range erb.ecNodes { ecShardMap.registerEcNode(ecNode, collection) } @@ -120,16 +159,10 @@ func rebuildEcVolumes(commandEnv *CommandEnv, allEcNodes []*EcNode, collection s continue } if shardCount < erasure_coding.DataShardsCount { - return fmt.Errorf("ec volume %d is unrepairable with %d shards\n", vid, shardCount) - } - - sortEcNodesByFreeslotsDescending(allEcNodes) - - if allEcNodes[0].freeEcSlot < erasure_coding.TotalShardsCount { - return fmt.Errorf("disk space is not enough") + return fmt.Errorf("ec volume %d is unrepairable with %d shards", vid, shardCount) } - if err := rebuildOneEcVolume(commandEnv, allEcNodes[0], collection, vid, locations, writer, applyChanges); err != nil { + if err := erb.rebuildOneEcVolume(collection, vid, locations); err != nil { return err } } @@ -137,17 +170,25 @@ func rebuildEcVolumes(commandEnv *CommandEnv, allEcNodes []*EcNode, collection s return nil } -func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyChanges bool) error { - - if !commandEnv.isLocked() { +func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.VolumeId, locations EcShardLocations) error { + if !erb.isLocked() { return fmt.Errorf("lock is lost") } + // TODO: fix this logic so it supports concurrent executions + rebuilder := erb.ecNodeWithMoreFreeSlots() + if rebuilder == nil { + return fmt.Errorf("no EC nodes available for rebuild") + } + if rebuilder.freeEcSlot < erasure_coding.TotalShardsCount { + return fmt.Errorf("disk space is not enough") + } + fmt.Printf("rebuildOneEcVolume %s %d\n", collection, volumeId) // collect shard files to rebuilder local disk var generatedShardIds []uint32 - copiedShardIds, _, err := prepareDataToRecover(commandEnv, rebuilder, collection, volumeId, locations, writer, applyChanges) + copiedShardIds, _, err := erb.prepareDataToRecover(rebuilder, collection, volumeId, locations) if err != nil { return err } @@ -155,25 +196,25 @@ func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection st // clean up working files // ask the rebuilder to delete the copied shards - err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, pb.NewServerAddressFromDataNode(rebuilder.info), copiedShardIds) + err = sourceServerDeleteEcShards(erb.commandEnv.option.GrpcDialOption, collection, volumeId, pb.NewServerAddressFromDataNode(rebuilder.info), copiedShardIds) if err != nil { - fmt.Fprintf(writer, "%s delete copied ec shards %s %d.%v\n", rebuilder.info.Id, collection, volumeId, copiedShardIds) + erb.write("%s delete copied ec shards %s %d.%v\n", rebuilder.info.Id, collection, volumeId, copiedShardIds) } }() - if !applyChanges { + if !erb.applyChanges { return nil } // generate ec shards, and maybe ecx file - generatedShardIds, err = generateMissingShards(commandEnv.option.GrpcDialOption, collection, volumeId, pb.NewServerAddressFromDataNode(rebuilder.info)) + generatedShardIds, err = erb.generateMissingShards(collection, volumeId, pb.NewServerAddressFromDataNode(rebuilder.info)) if err != nil { return err } // mount the generated shards - err = mountEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, pb.NewServerAddressFromDataNode(rebuilder.info), generatedShardIds) + err = mountEcShards(erb.commandEnv.option.GrpcDialOption, collection, volumeId, pb.NewServerAddressFromDataNode(rebuilder.info), generatedShardIds) if err != nil { return err } @@ -183,9 +224,9 @@ func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection st return nil } -func generateMissingShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress) (rebuiltShardIds []uint32, err error) { +func (erb *ecRebuilder) generateMissingShards(collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress) (rebuiltShardIds []uint32, err error) { - err = operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, sourceLocation, erb.commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, rebuildErr := volumeServerClient.VolumeEcShardsRebuild(context.Background(), &volume_server_pb.VolumeEcShardsRebuildRequest{ VolumeId: uint32(volumeId), Collection: collection, @@ -198,7 +239,7 @@ func generateMissingShards(grpcDialOption grpc.DialOption, collection string, vo return } -func prepareDataToRecover(commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyBalancing bool) (copiedShardIds []uint32, localShardIds []uint32, err error) { +func (erb *ecRebuilder) prepareDataToRecover(rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations) (copiedShardIds []uint32, localShardIds []uint32, err error) { needEcxFile := true var localShardBits erasure_coding.ShardBits @@ -212,21 +253,20 @@ func prepareDataToRecover(commandEnv *CommandEnv, rebuilder *EcNode, collection } for shardId, ecNodes := range locations { - if len(ecNodes) == 0 { - fmt.Fprintf(writer, "missing shard %d.%d\n", volumeId, shardId) + erb.write("missing shard %d.%d\n", volumeId, shardId) continue } if localShardBits.HasShardId(erasure_coding.ShardId(shardId)) { localShardIds = append(localShardIds, uint32(shardId)) - fmt.Fprintf(writer, "use existing shard %d.%d\n", volumeId, shardId) + erb.write("use existing shard %d.%d\n", volumeId, shardId) continue } var copyErr error - if applyBalancing { - copyErr = operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(rebuilder.info), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + if erb.applyChanges { + copyErr = operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(rebuilder.info), erb.commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(volumeId), Collection: collection, @@ -243,9 +283,9 @@ func prepareDataToRecover(commandEnv *CommandEnv, rebuilder *EcNode, collection } } if copyErr != nil { - fmt.Fprintf(writer, "%s failed to copy %d.%d from %s: %v\n", rebuilder.info.Id, volumeId, shardId, ecNodes[0].info.Id, copyErr) + erb.write("%s failed to copy %d.%d from %s: %v\n", rebuilder.info.Id, volumeId, shardId, ecNodes[0].info.Id, copyErr) } else { - fmt.Fprintf(writer, "%s copied %d.%d from %s\n", rebuilder.info.Id, volumeId, shardId, ecNodes[0].info.Id) + erb.write("%s copied %d.%d from %s\n", rebuilder.info.Id, volumeId, shardId, ecNodes[0].info.Id) copiedShardIds = append(copiedShardIds, uint32(shardId)) } |
