diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-07-16 11:46:04 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-07-16 11:47:37 -0700 |
| commit | fb7a1be1c4141b0e896d95a031a408ea7a1079e6 (patch) | |
| tree | 7c9cef4d4ce1a74314b49dd331414556b1344bd4 /weed/shell/command_volume_fix_replication.go | |
| parent | 3f31c4091a4ab07cb29b792e8d7505336ceecd51 (diff) | |
| download | seaweedfs-fb7a1be1c4141b0e896d95a031a408ea7a1079e6.tar.xz seaweedfs-fb7a1be1c4141b0e896d95a031a408ea7a1079e6.zip | |
refactor
Diffstat (limited to 'weed/shell/command_volume_fix_replication.go')
| -rw-r--r-- | weed/shell/command_volume_fix_replication.go | 94 |
1 files changed, 51 insertions, 43 deletions
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 538351fd0..b48bd3ea0 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -157,61 +157,69 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error { for _, vid := range underReplicatedVolumeIds { - replicas := volumeReplicas[vid] - replica := pickOneReplicaToCopyFrom(replicas) - replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) - foundNewLocation := false - hasSkippedCollection := false - keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType)) - fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType)) - for _, dst := range allLocations { - // check whether data nodes satisfy the constraints - if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { - // check collection name pattern - if *c.collectionPattern != "" { - matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection) - if err != nil { - return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err) - } - if !matched { - hasSkippedCollection = true - break - } - } + err := c.fixOneUnderReplicatedVolume(commandEnv, writer, takeAction, volumeReplicas, vid, allLocations) + if err != nil { + return err + } - // ask the volume server to replicate the volume - foundNewLocation = true - fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id) + } + return nil +} - if !takeAction { +func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, takeAction bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) error { + replicas := volumeReplicas[vid] + replica := pickOneReplicaToCopyFrom(replicas) + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) + foundNewLocation := false + hasSkippedCollection := false + keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType)) + fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType)) + for _, dst := range allLocations { + // check whether data nodes satisfy the constraints + if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { + // check collection name pattern + if *c.collectionPattern != "" { + matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection) + if err != nil { + return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err) + } + if !matched { + hasSkippedCollection = true break } + } - err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ - VolumeId: replica.info.Id, - SourceDataNode: replica.location.dataNode.Id, - }) - if replicateErr != nil { - return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr) - } - return nil - }) + // ask the volume server to replicate the volume + foundNewLocation = true + fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id) - if err != nil { - return err + if !takeAction { + break + } + + err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ + VolumeId: replica.info.Id, + SourceDataNode: replica.location.dataNode.Id, + }) + if replicateErr != nil { + return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr) } + return nil + }) - // adjust free volume count - dst.dataNode.DiskInfos[replica.info.DiskType].FreeVolumeCount-- - break + if err != nil { + return err } - } - if !foundNewLocation && !hasSkippedCollection { - fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas)) + // adjust free volume count + dst.dataNode.DiskInfos[replica.info.DiskType].FreeVolumeCount-- + break } + } + if !foundNewLocation && !hasSkippedCollection { + fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas)) } return nil } |
