diff options
Diffstat (limited to 'weed/shell/command_volume_fsck.go')
| -rw-r--r-- | weed/shell/command_volume_fsck.go | 77 |
1 files changed, 59 insertions, 18 deletions
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 5e0f8b4b6..e065767a3 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -9,7 +9,7 @@ import ( "math" "os" "path/filepath" - "time" + "sync" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -89,29 +89,32 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. // volume file ids substract filer file ids var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64 for volumeId, vinfo := range volumeIdToServer { - inUseCount, orphanChunkCount, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, volumeId, writer, *verbose) + inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, volumeId, writer, *verbose) if checkErr != nil { return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr) } totalInUseCount += inUseCount - totalOrphanChunkCount += orphanChunkCount + totalOrphanChunkCount += uint64(len(orphanFileIds)) totalOrphanDataSize += orphanDataSize + + if *applyPurging && len(orphanFileIds) > 0 { + if err = c.purgeFileIdsForOneVolume(volumeId, orphanFileIds, writer); err != nil { + return fmt.Errorf("purge for volume %d: %v\n", volumeId, err) + } + } } if totalOrphanChunkCount == 0 { fmt.Fprintf(writer, "no orphan data\n") + return nil } - pct := float64(totalOrphanChunkCount*100) / (float64(totalOrphanChunkCount + totalInUseCount)) - fmt.Fprintf(writer, "\nTotal\t\tentries:%d\torphan:%d\t%.2f%%\t%dB\n", - totalOrphanChunkCount+totalInUseCount, totalOrphanChunkCount, pct, totalOrphanDataSize) - - fmt.Fprintf(writer, "This could be normal if multiple filers or no filers are used.\n") + if !*applyPurging { + pct := float64(totalOrphanChunkCount*100) / (float64(totalOrphanChunkCount + totalInUseCount)) + fmt.Fprintf(writer, "\nTotal\t\tentries:%d\torphan:%d\t%.2f%%\t%dB\n", + totalOrphanChunkCount+totalInUseCount, totalOrphanChunkCount, pct, totalOrphanDataSize) - if *applyPurging { - fmt.Fprintf(writer, "\nstarting to destroy your data ...\n") - time.Sleep(30 * time.Second) - fmt.Fprintf(writer, "just kidding. Not implemented yet.\n") + fmt.Fprintf(writer, "This could be normal if multiple filers or no filers are used.\n") } return nil @@ -191,7 +194,7 @@ func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToSer }) } -func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (inUseCount, orphanChunkCount, orphanDataSize uint64, err error) { +func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) { db := needle_map.NewMemDb() defer db.Close() @@ -207,7 +210,7 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder stri dataLen := len(filerFileIdsData) if dataLen%8 != 0 { - return 0, 0, 0, fmt.Errorf("filer data is corrupted") + return 0, nil, 0, fmt.Errorf("filer data is corrupted") } for i := 0; i < len(filerFileIdsData); i += 8 { @@ -216,17 +219,19 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder stri inUseCount++ } + var orphanFileCount uint64 db.AscendingVisit(func(n needle_map.NeedleValue) error { // fmt.Printf("%d,%x\n", volumeId, n.Key) - orphanChunkCount++ + orphanFileIds = append(orphanFileIds, fmt.Sprintf("%d,%s", volumeId, n.Key.String())) + orphanFileCount++ orphanDataSize += uint64(n.Size) return nil }) - if orphanChunkCount > 0 { - pct := float64(orphanChunkCount*100) / (float64(orphanChunkCount + inUseCount)) + if orphanFileCount > 0 { + pct := float64(orphanFileCount*100) / (float64(orphanFileCount + inUseCount)) fmt.Fprintf(writer, "volume:%d\tentries:%d\torphan:%d\t%.2f%%\t%dB\n", - volumeId, orphanChunkCount+inUseCount, orphanChunkCount, pct, orphanDataSize) + volumeId, orphanFileCount+inUseCount, orphanFileCount, pct, orphanDataSize) } return @@ -278,6 +283,42 @@ func (c *commandVolumeFsck) collectVolumeIds(verbose bool, writer io.Writer) (vo return } +func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds []string, writer io.Writer) (err error) { + fmt.Fprintf(writer, "purging orphan data for volume %d...\n", volumeId) + locations, found := c.env.MasterClient.GetLocations(volumeId) + if !found { + return fmt.Errorf("failed to find volume %d locations", volumeId) + } + + resultChan := make(chan []*volume_server_pb.DeleteResult, len(locations)) + var wg sync.WaitGroup + for _, location := range locations { + wg.Add(1) + go func(server string, fidList []string) { + defer wg.Done() + + if deleteResults, deleteErr := operation.DeleteFilesAtOneVolumeServer(server, c.env.option.GrpcDialOption, fidList, false); deleteErr != nil { + err = deleteErr + } else if deleteResults != nil { + resultChan <- deleteResults + } + + }(location.Url, fileIds) + } + wg.Wait() + close(resultChan) + + for results := range resultChan { + for _, result := range results { + if result.Error != "" { + fmt.Fprintf(writer, "purge error: %s\n", result.Error) + } + } + } + + return +} + func getVolumeFileIdFile(tempFolder string, vid uint32) string { return filepath.Join(tempFolder, fmt.Sprintf("%d.idx", vid)) } |
