diff options
| author | hilimd <68371223+hilimd@users.noreply.github.com> | 2021-08-16 00:54:51 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-08-16 00:54:51 +0800 |
| commit | 27c05f8c0b5c7bda43babeb61d79684d11851111 (patch) | |
| tree | d235573112ce168ca904acbc3932ed12e94de80c /weed/shell/command_volume_fix_replication.go | |
| parent | 97ad3e9d027216d74132652d4d899c7fc7c33ab1 (diff) | |
| parent | ec989b037717f8fd7f0ed3bbc80f0a33654fe7aa (diff) | |
| download | seaweedfs-27c05f8c0b5c7bda43babeb61d79684d11851111.tar.xz seaweedfs-27c05f8c0b5c7bda43babeb61d79684d11851111.zip | |
Merge pull request #80 from chrislusf/master
sync
Diffstat (limited to 'weed/shell/command_volume_fix_replication.go')
| -rw-r--r-- | weed/shell/command_volume_fix_replication.go | 109 |
1 files changed, 60 insertions, 49 deletions
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 538351fd0..20d004c6b 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -58,6 +58,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'") skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes") + retryCount := volFixReplicationCommand.Int("retry", 0, "how many times to retry") if err = volFixReplicationCommand.Parse(args); err != nil { return nil } @@ -100,7 +101,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, } // find the most under populated data nodes - return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations) + return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount) } @@ -154,64 +155,74 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma return nil } -func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error { +func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int) (err error) { for _, vid := range underReplicatedVolumeIds { - replicas := volumeReplicas[vid] - replica := pickOneReplicaToCopyFrom(replicas) - replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) - foundNewLocation := false - hasSkippedCollection := false - keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType)) - fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType)) - for _, dst := range allLocations { - // check whether data nodes satisfy the constraints - if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { - // check collection name pattern - if *c.collectionPattern != "" { - matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection) - if err != nil { - return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err) - } - if !matched { - hasSkippedCollection = true - break - } - } - - // ask the volume server to replicate the volume - foundNewLocation = true - fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id) + for i := 0; i < retryCount+1; i++ { + if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, takeAction, volumeReplicas, vid, allLocations); err == nil { + break + } + } + } + return +} - if !takeAction { +func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, takeAction bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) error { + replicas := volumeReplicas[vid] + replica := pickOneReplicaToCopyFrom(replicas) + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) + foundNewLocation := false + hasSkippedCollection := false + keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType)) + fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType)) + for _, dst := range allLocations { + // check whether data nodes satisfy the constraints + if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { + // check collection name pattern + if *c.collectionPattern != "" { + matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection) + if err != nil { + return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err) + } + if !matched { + hasSkippedCollection = true break } + } - err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ - VolumeId: replica.info.Id, - SourceDataNode: replica.location.dataNode.Id, - }) - if replicateErr != nil { - return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr) - } - return nil - }) - - if err != nil { - return err - } + // ask the volume server to replicate the volume + foundNewLocation = true + fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id) + if !takeAction { // adjust free volume count dst.dataNode.DiskInfos[replica.info.DiskType].FreeVolumeCount-- break } - } - if !foundNewLocation && !hasSkippedCollection { - fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas)) + err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ + VolumeId: replica.info.Id, + SourceDataNode: replica.location.dataNode.Id, + }) + if replicateErr != nil { + return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr) + } + return nil + }) + + if err != nil { + return err + } + + // adjust free volume count + dst.dataNode.DiskInfos[replica.info.DiskType].FreeVolumeCount-- + break } + } + if !foundNewLocation && !hasSkippedCollection { + fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas)) } return nil } @@ -401,14 +412,14 @@ func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_b sort.Slice(replicas, func(i, j int) bool { a, b := replicas[i], replicas[j] - if a.info.CompactRevision != b.info.CompactRevision { - return a.info.CompactRevision < b.info.CompactRevision + if a.info.Size != b.info.Size { + return a.info.Size < b.info.Size } if a.info.ModifiedAtSecond != b.info.ModifiedAtSecond { return a.info.ModifiedAtSecond < b.info.ModifiedAtSecond } - if a.info.Size != b.info.Size { - return a.info.Size < b.info.Size + if a.info.CompactRevision != b.info.CompactRevision { + return a.info.CompactRevision < b.info.CompactRevision } return false }) |
