diff options
| author | Lisandro Pin <lisandro.pin@proton.ch> | 2025-11-11 01:03:38 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-11-10 16:03:38 -0800 |
| commit | 9744382a183f9b2d0ca0db44ffd30192631000aa (patch) | |
| tree | f96353c600dd6740f50dd4a78f444859d74cf93c /weed/shell/command_volume_check_disk.go | |
| parent | d8dd3e4c5348518964d965bdfc15e74d1c4ca81a (diff) | |
| download | seaweedfs-9744382a183f9b2d0ca0db44ffd30192631000aa.tar.xz seaweedfs-9744382a183f9b2d0ca0db44ffd30192631000aa.zip | |
Rework parameters passing for functions within `volume.check.disk`. (#7448)
* Rework parameters passing for functions within `volume.check.disk`.
We'll need to rework this logic to account for read-only volumes, and there're already way too many parameters shuffled around.
Grouping these into a single struct simplifies the overall codebase.
* similar fix
* Improved Error Handling in Tests
* propagate the errors
* edge cases
* edge case on modified time
* clean up
---------
Co-authored-by: chrislu <chris.lu@gmail.com>
Diffstat (limited to 'weed/shell/command_volume_check_disk.go')
| -rw-r--r-- | weed/shell/command_volume_check_disk.go | 296 |
1 files changed, 176 insertions, 120 deletions
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index 2abfc288c..740c9679d 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -8,7 +8,6 @@ import ( "io" "math" "net/http" - "sync" "time" "slices" @@ -26,9 +25,18 @@ func init() { Commands = append(Commands, &commandVolumeCheckDisk{}) } -type commandVolumeCheckDisk struct { - env *CommandEnv - writer io.Writer +type commandVolumeCheckDisk struct{} + +type volumeCheckDisk struct { + commandEnv *CommandEnv + writer io.Writer + now time.Time + + slowMode bool + verbose bool + applyChanges bool + syncDeletions bool + nonRepairThreshold float64 } func (c *commandVolumeCheckDisk) Name() string { @@ -53,67 +61,6 @@ func (c *commandVolumeCheckDisk) HasTag(tag CommandTag) bool { return tag == ResourceHeavy } -func (c *commandVolumeCheckDisk) getVolumeStatusFileCount(vid uint32, dn *master_pb.DataNodeInfo) (totalFileCount, deletedFileCount uint64) { - err := operation.WithVolumeServerClient(false, pb.NewServerAddressWithGrpcPort(dn.Id, int(dn.GrpcPort)), c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - resp, reqErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{ - VolumeId: uint32(vid), - }) - if resp != nil { - totalFileCount = resp.FileCount - deletedFileCount = resp.FileDeletedCount - } - return reqErr - }) - if err != nil { - fmt.Fprintf(c.writer, "getting number of files for volume id %d from volumes status: %+v\n", vid, err) - } - return totalFileCount, deletedFileCount -} - -func (c *commandVolumeCheckDisk) eqVolumeFileCount(a, b *VolumeReplica) (bool, bool) { - var waitGroup sync.WaitGroup - var fileCountA, fileCountB, fileDeletedCountA, fileDeletedCountB uint64 - waitGroup.Add(1) - go func() { - defer waitGroup.Done() - fileCountA, fileDeletedCountA = c.getVolumeStatusFileCount(a.info.Id, a.location.dataNode) - }() - waitGroup.Add(1) - go func() { - defer waitGroup.Done() - fileCountB, fileDeletedCountB = c.getVolumeStatusFileCount(b.info.Id, b.location.dataNode) - }() - // Trying to synchronize a remote call to two nodes - waitGroup.Wait() - return fileCountA == fileCountB, fileDeletedCountA == fileDeletedCountB -} - -func (c *commandVolumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica, pulseTime time.Time, syncDeletions, verbose bool) bool { - pulseTimeAtSecond := pulseTime.Unix() - doSyncDeletedCount := false - if syncDeletions && a.info.DeleteCount != b.info.DeleteCount { - doSyncDeletedCount = true - } - if (a.info.FileCount != b.info.FileCount) || doSyncDeletedCount { - // Do synchronization of volumes, if the modification time was before the last pulsation time - if a.info.ModifiedAtSecond < pulseTimeAtSecond || b.info.ModifiedAtSecond < pulseTimeAtSecond { - return false - } - if eqFileCount, eqDeletedFileCount := c.eqVolumeFileCount(a, b); eqFileCount { - if doSyncDeletedCount && !eqDeletedFileCount { - return false - } - if verbose { - fmt.Fprintf(c.writer, "skipping active volumes %d with the same file counts on %s and %s\n", - a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id) - } - } else { - return false - } - } - return true -} - func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) @@ -135,11 +82,20 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write if err = commandEnv.confirmIsLocked(args); err != nil { return } - c.env = commandEnv - c.writer = writer + + vcd := &volumeCheckDisk{ + commandEnv: commandEnv, + writer: writer, + now: time.Now(), + + slowMode: *slowMode, + verbose: *verbose, + applyChanges: *applyChanges, + syncDeletions: *syncDeletions, + nonRepairThreshold: *nonRepairThreshold, + } // collect topology information - pulseTime := time.Now().Add(-constants.VolumePulsePeriod * 2) topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) if err != nil { return err @@ -155,7 +111,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write var writableReplicas []*VolumeReplica for _, replica := range replicas { if replica.info.ReadOnly { - fmt.Fprintf(writer, "skipping readonly volume %d on %s\n", replica.info.Id, replica.location.dataNode.Id) + vcd.write("skipping readonly volume %d on %s\n", replica.info.Id, replica.location.dataNode.Id) } else { writableReplicas = append(writableReplicas, replica) } @@ -166,13 +122,19 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write }) for len(writableReplicas) >= 2 { a, b := writableReplicas[0], writableReplicas[1] - if !*slowMode && c.shouldSkipVolume(a, b, pulseTime, *syncDeletions, *verbose) { - // always choose the larger volume to be the source - writableReplicas = append(replicas[:1], writableReplicas[2:]...) - continue + if !vcd.slowMode { + shouldSkip, err := vcd.shouldSkipVolume(a, b) + if err != nil { + vcd.write("error checking if volume %d should be skipped: %v\n", a.info.Id, err) + // Continue with sync despite error to be safe + } else if shouldSkip { + // always choose the larger volume to be the source + writableReplicas = append(writableReplicas[:1], writableReplicas[2:]...) + continue + } } - if err := c.syncTwoReplicas(a, b, *applyChanges, *syncDeletions, *nonRepairThreshold, *verbose); err != nil { - fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err) + if err := vcd.syncTwoReplicas(a, b); err != nil { + vcd.write("sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err) } // always choose the larger volume to be the source if a.info.FileCount > b.info.FileCount { @@ -186,32 +148,134 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write return nil } -func (c *commandVolumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeReplica, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, verbose bool) (err error) { +func (vcd *volumeCheckDisk) isLocked() bool { + return vcd.commandEnv.isLocked() +} + +func (vcd *volumeCheckDisk) grpcDialOption() grpc.DialOption { + return vcd.commandEnv.option.GrpcDialOption +} + +func (vcd *volumeCheckDisk) write(format string, a ...any) { + fmt.Fprintf(vcd.writer, format, a...) +} + +func (vcd *volumeCheckDisk) writeVerbose(format string, a ...any) { + if vcd.verbose { + fmt.Fprintf(vcd.writer, format, a...) + } +} + +// getVolumeStatusFileCount retrieves the current file count and deleted file count +// from a volume server via gRPC. +func (vcd *volumeCheckDisk) getVolumeStatusFileCount(vid uint32, dn *master_pb.DataNodeInfo) (totalFileCount, deletedFileCount uint64, err error) { + err = operation.WithVolumeServerClient(false, pb.NewServerAddressWithGrpcPort(dn.Id, int(dn.GrpcPort)), vcd.grpcDialOption(), func(volumeServerClient volume_server_pb.VolumeServerClient) error { + resp, reqErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{ + VolumeId: uint32(vid), + }) + if resp != nil { + totalFileCount = resp.FileCount + deletedFileCount = resp.FileDeletedCount + } + return reqErr + }) + return totalFileCount, deletedFileCount, err +} + +// eqVolumeFileCount compares the real-time file counts of two volume replicas +// by making sequential gRPC calls to their volume servers. +// +// Returns: +// - bool: true if file counts match +// - bool: true if deleted file counts match +// - error: any error from volume server communication +// +// Error Handling: Errors from getVolumeStatusFileCount are wrapped with context +// (volume ID and server) and propagated up. Uses fmt.Errorf with %w to maintain +// error chain for errors.Is() and errors.As(). +func (vcd *volumeCheckDisk) eqVolumeFileCount(a, b *VolumeReplica) (bool, bool, error) { + fileCountA, fileDeletedCountA, errA := vcd.getVolumeStatusFileCount(a.info.Id, a.location.dataNode) + if errA != nil { + return false, false, fmt.Errorf("getting volume %d status from %s: %w", a.info.Id, a.location.dataNode.Id, errA) + } + + fileCountB, fileDeletedCountB, errB := vcd.getVolumeStatusFileCount(b.info.Id, b.location.dataNode) + if errB != nil { + return false, false, fmt.Errorf("getting volume %d status from %s: %w", b.info.Id, b.location.dataNode.Id, errB) + } + + return fileCountA == fileCountB, fileDeletedCountA == fileDeletedCountB, nil +} + +// shouldSkipVolume determines whether two volume replicas should skip synchronization. +// +// Logic: +// 1. If file counts and delete counts match (when syncDeletions enabled), skip sync +// 2. If counts differ AND both volumes were modified recently (>= pulseTimeAtSecond), +// they may still be actively receiving writes, so we return true to skip sync and +// avoid false positives +// 3. If counts differ AND at least one volume was modified before the pulse cutoff, +// call eqVolumeFileCount to get real-time counts from volume servers +// +// Returns: +// - bool: true if sync should be skipped +// - error: any error from volume server communication (when eqVolumeFileCount is called) +// +// Error Handling: Errors from eqVolumeFileCount are wrapped with context and propagated. +// The Do method logs these errors and continues processing to ensure other volumes are checked. +func (vcd *volumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica) (bool, error) { + pulseTimeAtSecond := vcd.now.Add(-constants.VolumePulsePeriod * 2).Unix() + doSyncDeletedCount := false + if vcd.syncDeletions && a.info.DeleteCount != b.info.DeleteCount { + doSyncDeletedCount = true + } + if (a.info.FileCount != b.info.FileCount) || doSyncDeletedCount { + // Do synchronization of volumes, if the modification time was before the last pulsation time + if a.info.ModifiedAtSecond < pulseTimeAtSecond || b.info.ModifiedAtSecond < pulseTimeAtSecond { + return false, nil + } + eqFileCount, eqDeletedFileCount, err := vcd.eqVolumeFileCount(a, b) + if err != nil { + return false, fmt.Errorf("comparing volume %d file counts on %s and %s: %w", + a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err) + } + if eqFileCount { + if doSyncDeletedCount && !eqDeletedFileCount { + return false, nil + } + vcd.writeVerbose("skipping active volumes %d with the same file counts on %s and %s\n", + a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id) + } else { + return false, nil + } + } + return true, nil +} + +func (vcd *volumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeReplica) (err error) { aHasChanges, bHasChanges := true, true const maxIterations = 5 iteration := 0 for (aHasChanges || bHasChanges) && iteration < maxIterations { iteration++ - if verbose { - fmt.Fprintf(c.writer, "sync iteration %d for volume %d\n", iteration, a.info.Id) - } + vcd.writeVerbose("sync iteration %d for volume %d\n", iteration, a.info.Id) prevAHasChanges, prevBHasChanges := aHasChanges, bHasChanges - if aHasChanges, bHasChanges, err = c.checkBoth(a, b, applyChanges, doSyncDeletions, nonRepairThreshold, verbose); err != nil { + if aHasChanges, bHasChanges, err = vcd.checkBoth(a, b); err != nil { return err } // Detect if we're stuck in a loop with no progress if iteration > 1 && prevAHasChanges == aHasChanges && prevBHasChanges == bHasChanges && (aHasChanges || bHasChanges) { - fmt.Fprintf(c.writer, "volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop\n", + vcd.write("volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, iteration) return fmt.Errorf("sync not making progress after %d iterations", iteration) } } if iteration >= maxIterations && (aHasChanges || bHasChanges) { - fmt.Fprintf(c.writer, "volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention\n", + vcd.write("volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention\n", a.info.Id, maxIterations, a.location.dataNode.Id, b.location.dataNode.Id) return fmt.Errorf("reached maximum sync iterations (%d)", maxIterations) } @@ -219,7 +283,7 @@ func (c *commandVolumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeRepl return nil } -func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, verbose bool) (aHasChanges bool, bHasChanges bool, err error) { +func (vcd *volumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica) (aHasChanges bool, bHasChanges bool, err error) { aDB, bDB := needle_map.NewMemDb(), needle_map.NewMemDb() defer func() { aDB.Close() @@ -227,17 +291,16 @@ func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, a }() // read index db - readIndexDbCutoffFrom := uint64(time.Now().UnixNano()) - if err = readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode), verbose, c.writer, c.env.option.GrpcDialOption); err != nil { + if err = vcd.readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode)); err != nil { return true, true, fmt.Errorf("readIndexDatabase %s volume %d: %v", a.location.dataNode, a.info.Id, err) } - if err := readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode), verbose, c.writer, c.env.option.GrpcDialOption); err != nil { + if err := vcd.readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode)); err != nil { return true, true, fmt.Errorf("readIndexDatabase %s volume %d: %v", b.location.dataNode, b.info.Id, err) } // find and make up the differences - aHasChanges, err1 := doVolumeCheckDisk(bDB, aDB, b, a, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption) - bHasChanges, err2 := doVolumeCheckDisk(aDB, bDB, a, b, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption) + aHasChanges, err1 := vcd.doVolumeCheckDisk(bDB, aDB, b, a) + bHasChanges, err2 := vcd.doVolumeCheckDisk(aDB, bDB, a, b) if err1 != nil { return aHasChanges, bHasChanges, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err1) } @@ -247,7 +310,7 @@ func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, a return aHasChanges, bHasChanges, nil } -func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *VolumeReplica, verbose bool, writer io.Writer, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, cutoffFromAtNs uint64, grpcDialOption grpc.DialOption) (hasChanges bool, err error) { +func (vcd *volumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *VolumeReplica) (hasChanges bool, err error) { // find missing keys // hash join, can be more efficient @@ -255,6 +318,8 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo var partiallyDeletedNeedles []needle_map.NeedleValue var counter int doCutoffOfLastNeedle := true + cutoffFromAtNs := uint64(vcd.now.UnixNano()) + minuend.DescendingVisit(func(minuendValue needle_map.NeedleValue) error { counter++ if subtrahendValue, found := subtrahend.Get(minuendValue.Key); !found { @@ -262,7 +327,7 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo return nil } if doCutoffOfLastNeedle { - if needleMeta, err := readNeedleMeta(grpcDialOption, pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, minuendValue); err == nil { + if needleMeta, err := readNeedleMeta(vcd.grpcDialOption(), pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, minuendValue); err == nil { // needles older than the cutoff time are not missing yet if needleMeta.AppendAtNs > cutoffFromAtNs { return nil @@ -282,7 +347,7 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo return nil }) - fmt.Fprintf(writer, "volume %d %s has %d entries, %s missed %d and partially deleted %d entries\n", + vcd.write("volume %d %s has %d entries, %s missed %d and partially deleted %d entries\n", source.info.Id, source.location.dataNode.Id, counter, target.location.dataNode.Id, len(missingNeedles), len(partiallyDeletedNeedles)) if counter == 0 || (len(missingNeedles) == 0 && len(partiallyDeletedNeedles) == 0) { @@ -290,45 +355,40 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo } missingNeedlesFraction := float64(len(missingNeedles)) / float64(counter) - if missingNeedlesFraction > nonRepairThreshold { + if missingNeedlesFraction > vcd.nonRepairThreshold { return false, fmt.Errorf( "failed to start repair volume %d, percentage of missing keys is greater than the threshold: %.2f > %.2f", - source.info.Id, missingNeedlesFraction, nonRepairThreshold) + source.info.Id, missingNeedlesFraction, vcd.nonRepairThreshold) } for _, needleValue := range missingNeedles { - needleBlob, err := readSourceNeedleBlob(grpcDialOption, pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, needleValue) + needleBlob, err := vcd.readSourceNeedleBlob(pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, needleValue) if err != nil { return hasChanges, err } - if !applyChanges { + if !vcd.applyChanges { continue } - if verbose { - fmt.Fprintf(writer, "read %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) - } - + vcd.writeVerbose("read %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) hasChanges = true - if err = writeNeedleBlobToTarget(grpcDialOption, pb.NewServerAddressFromDataNode(target.location.dataNode), source.info.Id, needleValue, needleBlob); err != nil { + if err = vcd.writeNeedleBlobToTarget(pb.NewServerAddressFromDataNode(target.location.dataNode), source.info.Id, needleValue, needleBlob); err != nil { return hasChanges, err } } - if doSyncDeletions && applyChanges && len(partiallyDeletedNeedles) > 0 { + if vcd.syncDeletions && vcd.applyChanges && len(partiallyDeletedNeedles) > 0 { var fidList []string for _, needleValue := range partiallyDeletedNeedles { fidList = append(fidList, needleValue.Key.FileId(source.info.Id)) - if verbose { - fmt.Fprintf(writer, "delete %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) - } + vcd.writeVerbose("delete %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) } deleteResults := operation.DeleteFileIdsAtOneVolumeServer( pb.NewServerAddressFromDataNode(target.location.dataNode), - grpcDialOption, fidList, false) + vcd.grpcDialOption(), fidList, false) // Check for errors in results for _, deleteResult := range deleteResults { @@ -343,9 +403,9 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo return hasChanges, nil } -func readSourceNeedleBlob(grpcDialOption grpc.DialOption, sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) { +func (vcd *volumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) { - err = operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, sourceVolumeServer, vcd.grpcDialOption(), func(client volume_server_pb.VolumeServerClient) error { resp, err := client.ReadNeedleBlob(context.Background(), &volume_server_pb.ReadNeedleBlobRequest{ VolumeId: volumeId, Offset: needleValue.Offset.ToActualOffset(), @@ -360,9 +420,9 @@ func readSourceNeedleBlob(grpcDialOption grpc.DialOption, sourceVolumeServer pb. return } -func writeNeedleBlobToTarget(grpcDialOption grpc.DialOption, targetVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue, needleBlob []byte) error { +func (vcd *volumeCheckDisk) writeNeedleBlobToTarget(targetVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue, needleBlob []byte) error { - return operation.WithVolumeServerClient(false, targetVolumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, targetVolumeServer, vcd.grpcDialOption(), func(client volume_server_pb.VolumeServerClient) error { _, err := client.WriteNeedleBlob(context.Background(), &volume_server_pb.WriteNeedleBlobRequest{ VolumeId: volumeId, NeedleId: uint64(needleValue.Key), @@ -371,25 +431,21 @@ func writeNeedleBlobToTarget(grpcDialOption grpc.DialOption, targetVolumeServer }) return err }) - } -func readIndexDatabase(db *needle_map.MemDb, collection string, volumeId uint32, volumeServer pb.ServerAddress, verbose bool, writer io.Writer, grpcDialOption grpc.DialOption) error { - +func (vcd *volumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collection string, volumeId uint32, volumeServer pb.ServerAddress) error { var buf bytes.Buffer - if err := copyVolumeIndexFile(collection, volumeId, volumeServer, &buf, verbose, writer, grpcDialOption); err != nil { + if err := vcd.copyVolumeIndexFile(collection, volumeId, volumeServer, &buf); err != nil { return err } - if verbose { - fmt.Fprintf(writer, "load collection %s volume %d index size %d from %s ...\n", collection, volumeId, buf.Len(), volumeServer) - } + vcd.writeVerbose("load collection %s volume %d index size %d from %s ...\n", collection, volumeId, buf.Len(), volumeServer) return db.LoadFilterFromReaderAt(bytes.NewReader(buf.Bytes()), true, false) } -func copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.ServerAddress, buf *bytes.Buffer, verbose bool, writer io.Writer, grpcDialOption grpc.DialOption) error { +func (vcd *volumeCheckDisk) copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.ServerAddress, buf *bytes.Buffer) error { - return operation.WithVolumeServerClient(true, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(true, volumeServer, vcd.grpcDialOption(), func(volumeServerClient volume_server_pb.VolumeServerClient) error { ext := ".idx" @@ -406,7 +462,7 @@ func copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.Ser return fmt.Errorf("failed to start copying volume %d%s: %v", volumeId, ext, err) } - err = writeToBuffer(copyFileClient, buf) + err = vcd.writeToBuffer(copyFileClient, buf) if err != nil { return fmt.Errorf("failed to copy %d%s from %s: %v", volumeId, ext, volumeServer, err) } @@ -416,7 +472,7 @@ func copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.Ser }) } -func writeToBuffer(client volume_server_pb.VolumeServer_CopyFileClient, buf *bytes.Buffer) error { +func (vcd *volumeCheckDisk) writeToBuffer(client volume_server_pb.VolumeServer_CopyFileClient, buf *bytes.Buffer) error { for { resp, receiveErr := client.Recv() if receiveErr == io.EOF { |
