aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Yang <lanqingy@usc.edu>2022-09-10 15:29:17 -0700
committerGitHub <noreply@github.com>2022-09-10 15:29:17 -0700
commitddd6bee970e5a09903b115d48f47ea729f0d6d3e (patch)
treef8e2d85be4a87ca665dbb722f1402af16754003a
parent2c6b68b40effc0ed96439a5ba09242e463c303f3 (diff)
downloadseaweedfs-ddd6bee970e5a09903b115d48f47ea729f0d6d3e.tar.xz
seaweedfs-ddd6bee970e5a09903b115d48f47ea729f0d6d3e.zip
ADHOC: Volume fsck use a time cutoff param (#3626)
* ADHOC: cut off volumn fsck * more * fix typo * add test * modify name * fix comment * fix comments * nit * fix typo * Update weed/shell/command_volume_fsck.go Co-authored-by: root <root@HQ-10MSTD3EY.roblox.local> Co-authored-by: Chris Lu <chrislusf@users.noreply.github.com>
-rw-r--r--weed/shell/command_volume_fsck.go55
-rw-r--r--weed/storage/idx/binary_search.go29
-rw-r--r--weed/storage/idx_binary_search_test.go57
3 files changed, 126 insertions, 15 deletions
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index e48e53d85..cae8e22d4 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -2,7 +2,9 @@ package shell
import (
"bufio"
+ "bytes"
"context"
+ "errors"
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/filer"
@@ -11,6 +13,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/seaweedfs/seaweedfs/weed/storage/idx"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
@@ -72,6 +75,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
c.forcePurging = fsckCommand.Bool("forcePurging", false, "delete missing data from volumes in one replica used together with applyPurging")
purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler")
tempPath := fsckCommand.String("tempPath", path.Join(os.TempDir()), "path for temporary idx files")
+ cutoffTimeAgo := fsckCommand.Duration("cutoffTimeAgo", 5*time.Minute, "only include entries on volume servers before this cutoff time to check orphan chunks")
if err = fsckCommand.Parse(args); err != nil {
return nil
@@ -126,7 +130,8 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
delete(volumeIdToVInfo, volumeId)
continue
}
- err = c.collectOneVolumeFileIds(tempFolder, dataNodeId, volumeId, vinfo, *verbose, writer)
+ cutoffFrom := time.Now().Add(-*cutoffTimeAgo).UnixNano()
+ err = c.collectOneVolumeFileIds(tempFolder, dataNodeId, volumeId, vinfo, *verbose, writer, uint64(cutoffFrom))
if err != nil {
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
}
@@ -351,7 +356,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
return nil
}
-func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, dataNodeId string, volumeId uint32, vinfo VInfo, verbose bool, writer io.Writer) error {
+func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, dataNodeId string, volumeId uint32, vinfo VInfo, verbose bool, writer io.Writer, cutoffFrom uint64) error {
if verbose {
fmt.Fprintf(writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server)
@@ -377,13 +382,42 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, dataNodeI
return fmt.Errorf("failed to start copying volume %d%s: %v", volumeId, ext, err)
}
- err = writeToFile(copyFileClient, getVolumeFileIdFile(tempFolder, dataNodeId, volumeId))
+ var buf bytes.Buffer
+ for {
+ resp, err := copyFileClient.Recv()
+ if errors.Is(err, io.EOF) {
+ break
+ }
+ if err != nil {
+ return err
+ }
+ buf.Write(resp.FileContent)
+ }
+ if vinfo.isReadOnly == false {
+ index, err := idx.FirstInvalidIndex(buf.Bytes(), func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) {
+ resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{
+ VolumeId: volumeId,
+ NeedleId: uint64(key),
+ Offset: offset.ToActualOffset(),
+ Size: int32(size),
+ })
+ if err != nil {
+ return false, fmt.Errorf("to read needle meta with id %d from volume %d with error %v", key, volumeId, err)
+ }
+ return resp.LastModified <= cutoffFrom, nil
+ })
+ if err != nil {
+ fmt.Fprintf(writer, "Failed to search for last vilad index on volume %d with error %v", volumeId, err)
+ }
+ buf.Truncate(index * types.NeedleMapEntrySize)
+ }
+ idxFilename := getVolumeFileIdFile(tempFolder, dataNodeId, volumeId)
+ err = writeToFile(buf.Bytes(), idxFilename)
if err != nil {
return fmt.Errorf("failed to copy %d%s from %s: %v", volumeId, ext, vinfo.server, err)
}
return nil
-
})
}
@@ -673,7 +707,7 @@ func getFilerFileIdFile(tempFolder string, vid uint32) string {
return filepath.Join(tempFolder, fmt.Sprintf("%d.fid", vid))
}
-func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string) error {
+func writeToFile(bytes []byte, fileName string) error {
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
dst, err := os.OpenFile(fileName, flags, 0644)
if err != nil {
@@ -681,15 +715,6 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s
}
defer dst.Close()
- for {
- resp, receiveErr := client.Recv()
- if receiveErr == io.EOF {
- break
- }
- if receiveErr != nil {
- return fmt.Errorf("receiving %s: %v", fileName, receiveErr)
- }
- dst.Write(resp.FileContent)
- }
+ dst.Write(bytes)
return nil
}
diff --git a/weed/storage/idx/binary_search.go b/weed/storage/idx/binary_search.go
new file mode 100644
index 000000000..93bdfd7d8
--- /dev/null
+++ b/weed/storage/idx/binary_search.go
@@ -0,0 +1,29 @@
+package idx
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/storage/types"
+)
+
+// firstInvalidIndex find the first index the failed lessThanOrEqualToFn function's requirement.
+func FirstInvalidIndex(bytes []byte, lessThanOrEqualToFn func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error)) (int, error) {
+ left, right := 0, len(bytes)/types.NeedleMapEntrySize-1
+ index := right + 1
+ for left <= right {
+ mid := left + (right-left)>>1
+ loc := mid * types.NeedleMapEntrySize
+ key := types.BytesToNeedleId(bytes[loc : loc+types.NeedleIdSize])
+ offset := types.BytesToOffset(bytes[loc+types.NeedleIdSize : loc+types.NeedleIdSize+types.OffsetSize])
+ size := types.BytesToSize(bytes[loc+types.NeedleIdSize+types.OffsetSize : loc+types.NeedleIdSize+types.OffsetSize+types.SizeSize])
+ res, err := lessThanOrEqualToFn(key, offset, size)
+ if err != nil {
+ return -1, err
+ }
+ if res {
+ left = mid + 1
+ } else {
+ index = mid
+ right = mid - 1
+ }
+ }
+ return index, nil
+}
diff --git a/weed/storage/idx_binary_search_test.go b/weed/storage/idx_binary_search_test.go
new file mode 100644
index 000000000..48f48852e
--- /dev/null
+++ b/weed/storage/idx_binary_search_test.go
@@ -0,0 +1,57 @@
+package storage
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/storage/idx"
+ "github.com/seaweedfs/seaweedfs/weed/storage/needle"
+ "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
+ "github.com/seaweedfs/seaweedfs/weed/storage/types"
+ "github.com/stretchr/testify/assert"
+ "os"
+ "testing"
+)
+
+func TestFirstInvalidIndex(t *testing.T) {
+ dir := t.TempDir()
+
+ v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0)
+ if err != nil {
+ t.Fatalf("volume creation: %v", err)
+ }
+ type WriteInfo struct {
+ offset int64
+ size int32
+ }
+ // initialize 20 needles then update first 10 needles
+ for i := 1; i <= 30; i++ {
+ n := newRandomNeedle(uint64(i))
+ n.Flags = 0x08
+ _, _, _, err := v.writeNeedle2(n, true, false)
+ if err != nil {
+ t.Fatalf("write needle %d: %v", i, err)
+ }
+ }
+ b, err := os.ReadFile(v.IndexFileName() + ".idx")
+ // base case every record is valid -> nothing is filtered
+ index, err := idx.FirstInvalidIndex(b, func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) {
+ return true, nil
+ })
+ if err != nil {
+ t.Fatalf("failed to complete binary search %v", err)
+ }
+ assert.Equal(t, 30, index, "when every record is valid nothing should be filtered from binary search")
+ index, err = idx.FirstInvalidIndex(b, func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) {
+ return false, nil
+ })
+ assert.Equal(t, 0, index, "when every record is invalid everything should be filtered from binary search")
+ index, err = idx.FirstInvalidIndex(b, func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) {
+ return key < 20, nil
+ })
+ // needle key range from 1 to 30 so < 20 means 19 keys are valid and cutoff the bytes at 19 * 16 = 304
+ assert.Equal(t, 19, index, "when every record is invalid everything should be filtered from binary search")
+
+ index, err = idx.FirstInvalidIndex(b, func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) {
+ return key <= 1, nil
+ })
+ // needle key range from 1 to 30 so <=1 1 means 1 key is valid and cutoff the bytes at 1 * 16 = 16
+ assert.Equal(t, 1, index, "when every record is invalid everything should be filtered from binary search")
+}