aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2022-04-01 14:45:41 +0500
committerKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2022-04-01 14:45:41 +0500
commit7f1383a41ef8011224f15fd9252aa0677af62324 (patch)
tree8a70d28ab31ad12220265a762d6d0d8f29f642d8
parent3817e05dd043f024854b2c9acae6daecf2ff14b8 (diff)
downloadseaweedfs-7f1383a41ef8011224f15fd9252aa0677af62324.tar.xz
seaweedfs-7f1383a41ef8011224f15fd9252aa0677af62324.zip
findExtraChunksInVolumeServers in consideration of replication
-rw-r--r--weed/shell/command_volume_fsck.go68
1 files changed, 52 insertions, 16 deletions
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index 557c1e18e..1aa33e054 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -245,13 +245,32 @@ func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInf
func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging bool) error {
var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64
-
+ volumeIdOrphanFileIds := make(map[uint32]map[string]bool)
+ isSeveralReplicas := make(map[uint32]bool)
+ isEcVolumeReplicas := make(map[uint32]bool)
+ isReadOnlyReplicas := make(map[uint32]bool)
+ serverReplicas := make(map[uint32][]pb.ServerAddress)
for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
for volumeId, vinfo := range volumeIdToVInfo {
inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, dataNodeId, volumeId, writer, verbose)
if checkErr != nil {
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
}
+ isSeveralReplicas[volumeId] = false
+ if _, found := volumeIdOrphanFileIds[volumeId]; !found {
+ volumeIdOrphanFileIds[volumeId] = make(map[string]bool)
+ } else {
+ isSeveralReplicas[volumeId] = true
+ }
+ for _, fid := range orphanFileIds {
+ if isSeveralReplicas[volumeId] {
+ if _, found := volumeIdOrphanFileIds[volumeId][fid]; !found {
+ continue
+ }
+ }
+ volumeIdOrphanFileIds[volumeId][fid] = isSeveralReplicas[volumeId]
+ }
+
totalInUseCount += inUseCount
totalOrphanChunkCount += uint64(len(orphanFileIds))
totalOrphanDataSize += orphanDataSize
@@ -261,31 +280,48 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
fmt.Fprintf(writer, "%s\n", fid)
}
}
+ isEcVolumeReplicas[volumeId] = vinfo.isEcVolume
+ if isReadOnly, found := isReadOnlyReplicas[volumeId]; !(found && isReadOnly) {
+ isReadOnlyReplicas[volumeId] = vinfo.isReadOnly
+ }
+ serverReplicas[volumeId] = append(serverReplicas[volumeId], vinfo.server)
+ }
- if applyPurging && len(orphanFileIds) > 0 {
- if verbose {
- fmt.Fprintf(writer, "purging process for volume %d", volumeId)
- }
-
- if vinfo.isEcVolume {
- fmt.Fprintf(writer, "skip purging for Erasure Coded volume %d.\n", volumeId)
- continue
+ for volumeId, orphanReplicaFileIds := range volumeIdOrphanFileIds {
+ if !(applyPurging && len(orphanReplicaFileIds) > 0) {
+ continue
+ }
+ orphanFileIds := []string{}
+ for fid, foundInAllReplicas := range orphanReplicaFileIds {
+ if !isSeveralReplicas[volumeId] || (isSeveralReplicas[volumeId] && foundInAllReplicas) {
+ orphanFileIds = append(orphanFileIds, fid)
}
+ }
+ if !(len(orphanFileIds) > 0) {
+ continue
+ }
+ if verbose {
+ fmt.Fprintf(writer, "purging process for volume %d", volumeId)
+ }
+ if isEcVolumeReplicas[volumeId] {
+ fmt.Fprintf(writer, "skip purging for Erasure Coded volume %d.\n", volumeId)
+ continue
+ }
+ for _, server := range serverReplicas[volumeId] {
needleVID := needle.VolumeId(volumeId)
- if vinfo.isReadOnly {
- err := markVolumeWritable(c.env.option.GrpcDialOption, needleVID, vinfo.server, true)
+ if isReadOnlyReplicas[volumeId] {
+ err := markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, true)
if err != nil {
return fmt.Errorf("mark volume %d read/write: %v", volumeId, err)
}
- fmt.Fprintf(writer, "temporarily marked %d on server %v writable for forced purge\n", volumeId, vinfo.server)
- defer markVolumeWritable(c.env.option.GrpcDialOption, needleVID, vinfo.server, false)
- }
-
- fmt.Fprintf(writer, "marked %d on server %v writable for forced purge\n", volumeId, vinfo.server)
+ fmt.Fprintf(writer, "temporarily marked %d on server %v writable for forced purge\n", volumeId, server)
+ defer markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, false)
+ fmt.Fprintf(writer, "marked %d on server %v writable for forced purge\n", volumeId, server)
+ }
if verbose {
fmt.Fprintf(writer, "purging files from volume %d\n", volumeId)
}