diff options
Diffstat (limited to 'weed/shell/command_volume_fsck.go')
| -rw-r--r-- | weed/shell/command_volume_fsck.go | 55 |
1 files changed, 40 insertions, 15 deletions
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index e48e53d85..cae8e22d4 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -2,7 +2,9 @@ package shell import ( "bufio" + "bytes" "context" + "errors" "flag" "fmt" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -11,6 +13,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/idx" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" "github.com/seaweedfs/seaweedfs/weed/storage/types" @@ -72,6 +75,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. c.forcePurging = fsckCommand.Bool("forcePurging", false, "delete missing data from volumes in one replica used together with applyPurging") purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler") tempPath := fsckCommand.String("tempPath", path.Join(os.TempDir()), "path for temporary idx files") + cutoffTimeAgo := fsckCommand.Duration("cutoffTimeAgo", 5*time.Minute, "only include entries on volume servers before this cutoff time to check orphan chunks") if err = fsckCommand.Parse(args); err != nil { return nil @@ -126,7 +130,8 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. delete(volumeIdToVInfo, volumeId) continue } - err = c.collectOneVolumeFileIds(tempFolder, dataNodeId, volumeId, vinfo, *verbose, writer) + cutoffFrom := time.Now().Add(-*cutoffTimeAgo).UnixNano() + err = c.collectOneVolumeFileIds(tempFolder, dataNodeId, volumeId, vinfo, *verbose, writer, uint64(cutoffFrom)) if err != nil { return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err) } @@ -351,7 +356,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn return nil } -func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, dataNodeId string, volumeId uint32, vinfo VInfo, verbose bool, writer io.Writer) error { +func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, dataNodeId string, volumeId uint32, vinfo VInfo, verbose bool, writer io.Writer, cutoffFrom uint64) error { if verbose { fmt.Fprintf(writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server) @@ -377,13 +382,42 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, dataNodeI return fmt.Errorf("failed to start copying volume %d%s: %v", volumeId, ext, err) } - err = writeToFile(copyFileClient, getVolumeFileIdFile(tempFolder, dataNodeId, volumeId)) + var buf bytes.Buffer + for { + resp, err := copyFileClient.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return err + } + buf.Write(resp.FileContent) + } + if vinfo.isReadOnly == false { + index, err := idx.FirstInvalidIndex(buf.Bytes(), func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) { + resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{ + VolumeId: volumeId, + NeedleId: uint64(key), + Offset: offset.ToActualOffset(), + Size: int32(size), + }) + if err != nil { + return false, fmt.Errorf("to read needle meta with id %d from volume %d with error %v", key, volumeId, err) + } + return resp.LastModified <= cutoffFrom, nil + }) + if err != nil { + fmt.Fprintf(writer, "Failed to search for last vilad index on volume %d with error %v", volumeId, err) + } + buf.Truncate(index * types.NeedleMapEntrySize) + } + idxFilename := getVolumeFileIdFile(tempFolder, dataNodeId, volumeId) + err = writeToFile(buf.Bytes(), idxFilename) if err != nil { return fmt.Errorf("failed to copy %d%s from %s: %v", volumeId, ext, vinfo.server, err) } return nil - }) } @@ -673,7 +707,7 @@ func getFilerFileIdFile(tempFolder string, vid uint32) string { return filepath.Join(tempFolder, fmt.Sprintf("%d.fid", vid)) } -func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string) error { +func writeToFile(bytes []byte, fileName string) error { flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC dst, err := os.OpenFile(fileName, flags, 0644) if err != nil { @@ -681,15 +715,6 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s } defer dst.Close() - for { - resp, receiveErr := client.Recv() - if receiveErr == io.EOF { - break - } - if receiveErr != nil { - return fmt.Errorf("receiving %s: %v", fileName, receiveErr) - } - dst.Write(resp.FileContent) - } + dst.Write(bytes) return nil } |
