aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_volume_fix_replication.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell/command_volume_fix_replication.go')
-rw-r--r--weed/shell/command_volume_fix_replication.go23
1 files changed, 16 insertions, 7 deletions
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index 29bfe3f76..f4dd0239a 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -16,7 +16,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
- "google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
@@ -204,22 +203,32 @@ func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[ui
type SelectOneVolumeFunc func(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica
-func checkOneVolume(a *VolumeReplica, b *VolumeReplica, writer io.Writer, grpcDialOption grpc.DialOption) (err error) {
+func checkOneVolume(a *VolumeReplica, b *VolumeReplica, writer io.Writer, commandEnv *CommandEnv) (err error) {
aDB, bDB := needle_map.NewMemDb(), needle_map.NewMemDb()
defer func() {
aDB.Close()
bDB.Close()
}()
+ vcd := &volumeCheckDisk{
+ writer: writer,
+ commandEnv: commandEnv,
+ now: time.Now(),
+
+ verbose: false,
+ applyChanges: true,
+ syncDeletions: false,
+ nonRepairThreshold: float64(1),
+ }
+
// read index db
- readIndexDbCutoffFrom := uint64(time.Now().UnixNano())
- if err = readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode), false, writer, grpcDialOption); err != nil {
+ if err = vcd.readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode)); err != nil {
return fmt.Errorf("readIndexDatabase %s volume %d: %v", a.location.dataNode, a.info.Id, err)
}
- if err := readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode), false, writer, grpcDialOption); err != nil {
+ if err := vcd.readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode)); err != nil {
return fmt.Errorf("readIndexDatabase %s volume %d: %v", b.location.dataNode, b.info.Id, err)
}
- if _, err = doVolumeCheckDisk(aDB, bDB, a, b, false, writer, true, false, float64(1), readIndexDbCutoffFrom, grpcDialOption); err != nil {
+ if _, err = vcd.doVolumeCheckDisk(aDB, bDB, a, b); err != nil {
return fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err)
}
return
@@ -271,7 +280,7 @@ func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, wr
if replicaB.location.dataNode == replica.location.dataNode {
continue
}
- if checkErr = checkOneVolume(replica, replicaB, writer, commandEnv.option.GrpcDialOption); checkErr != nil {
+ if checkErr = checkOneVolume(replica, replicaB, writer, commandEnv); checkErr != nil {
fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", replica.info.Id, replica.location.dataNode.Id, replicaB.location.dataNode.Id, checkErr)
break
}