aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/shell/command_volume_fsck.go46
-rw-r--r--weed/shell/commands.go15
2 files changed, 38 insertions, 23 deletions
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index 6e3a813cf..72babdb6d 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -13,6 +13,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
@@ -26,6 +27,7 @@ import (
"os"
"path"
"path/filepath"
+ "strings"
"sync"
"time"
)
@@ -35,8 +37,7 @@ func init() {
}
const (
- readbufferSize = 16
- verifyProbeBlobSize = 16
+ readbufferSize = 16
)
type commandVolumeFsck struct {
@@ -88,7 +89,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler")
tempPath := fsckCommand.String("tempPath", path.Join(os.TempDir()), "path for temporary idx files")
cutoffTimeAgo := fsckCommand.Duration("cutoffTimeAgo", 5*time.Minute, "only include entries on volume servers before this cutoff time to check orphan chunks")
- c.verifyNeedle = fsckCommand.Bool("verifyNeedles", false, "try get head needle blob from volume server")
+ c.verifyNeedle = fsckCommand.Bool("verifyNeedles", false, "check needles status from volume server")
if err = fsckCommand.Parse(args); err != nil {
return nil
@@ -263,7 +264,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
serverReplicas := make(map[uint32][]pb.ServerAddress)
for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
for volumeId, vinfo := range volumeIdToVInfo {
- inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(dataNodeId, volumeId, vinfo.collection)
+ inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(dataNodeId, volumeId, &vinfo)
if checkErr != nil {
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
}
@@ -533,36 +534,35 @@ func (c *commandVolumeFsck) httpDelete(path util.FullPath) {
}
}
-func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId string, volumeId uint32, collection string) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) {
+func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId string, volumeId uint32, vinfo *VInfo) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) {
- db := needle_map.NewMemDb()
- defer db.Close()
+ volumeFileIdDb := needle_map.NewMemDb()
+ defer volumeFileIdDb.Close()
- if err = db.LoadFromIdx(getVolumeFileIdFile(c.tempFolder, dataNodeId, volumeId)); err != nil {
+ if err = volumeFileIdDb.LoadFromIdx(getVolumeFileIdFile(c.tempFolder, dataNodeId, volumeId)); err != nil {
err = fmt.Errorf("failed to LoadFromIdx %+v", err)
return
}
- volumeAddr := pb.NewServerAddressWithGrpcPort(dataNodeId, 0)
- if err = c.readFilerFileIdFile(volumeId, func(nId types.NeedleId, itemPath util.FullPath) {
+ if err = c.readFilerFileIdFile(volumeId, func(filerNeedleId types.NeedleId, itemPath util.FullPath) {
inUseCount++
if *c.verifyNeedle {
- if v, ok := db.Get(nId); ok && v.Size.IsValid() {
- newSize := types.Size(verifyProbeBlobSize)
- if v.Size > newSize {
- v.Size = newSize
- }
- if _, err := readSourceNeedleBlob(c.env.option.GrpcDialOption, volumeAddr, volumeId, *v); err != nil {
- fmt.Fprintf(c.writer, "failed to read file %s NeedleBlob %+v: %+v", itemPath, nId, err)
- if *c.forcePurging {
- return
+ if needleValue, ok := volumeFileIdDb.Get(filerNeedleId); ok && !needleValue.Size.IsDeleted() {
+ if _, err := readNeedleStatus(c.env.option.GrpcDialOption, vinfo.server, volumeId, *needleValue); err != nil {
+ // files may be deleted during copying filesIds
+ if !strings.Contains(err.Error(), storage.ErrorDeleted.Error()) {
+ fmt.Fprintf(c.writer, "failed to read %d:%s needle status of file %s: %+v\n",
+ volumeId, filerNeedleId.String(), itemPath, err)
+ if *c.forcePurging {
+ return
+ }
}
}
}
}
- if err = db.Delete(nId); err != nil && *c.verbose {
- fmt.Fprintf(c.writer, "failed to nm.delete %s(%+v): %+v", itemPath, nId, err)
+ if err = volumeFileIdDb.Delete(filerNeedleId); err != nil && *c.verbose {
+ fmt.Fprintf(c.writer, "failed to nm.delete %s(%+v): %+v", itemPath, filerNeedleId, err)
}
}); err != nil {
err = fmt.Errorf("failed to readFilerFileIdFile %+v", err)
@@ -570,8 +570,8 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId stri
}
var orphanFileCount uint64
- if err = db.AscendingVisit(func(n needle_map.NeedleValue) error {
- if !n.Size.IsValid() {
+ if err = volumeFileIdDb.AscendingVisit(func(n needle_map.NeedleValue) error {
+ if n.Size.IsDeleted() {
return nil
}
orphanFileIds = append(orphanFileIds, n.Key.FileId(volumeId))
diff --git a/weed/shell/commands.go b/weed/shell/commands.go
index 66fdcb6bd..af6888458 100644
--- a/weed/shell/commands.go
+++ b/weed/shell/commands.go
@@ -169,3 +169,18 @@ func readNeedleMeta(grpcDialOption grpc.DialOption, volumeServer pb.ServerAddres
)
return
}
+
+func readNeedleStatus(grpcDialOption grpc.DialOption, sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (resp *volume_server_pb.VolumeNeedleStatusResponse, err error) {
+ err = operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption,
+ func(client volume_server_pb.VolumeServerClient) error {
+ if resp, err = client.VolumeNeedleStatus(context.Background(), &volume_server_pb.VolumeNeedleStatusRequest{
+ VolumeId: volumeId,
+ NeedleId: uint64(needleValue.Key),
+ }); err != nil {
+ return err
+ }
+ return nil
+ },
+ )
+ return
+}