aboutsummaryrefslogtreecommitdiff
path: root/weed/shell
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell')
-rw-r--r--weed/shell/command_ec_common.go190
-rw-r--r--weed/shell/command_volume_check_disk.go131
-rw-r--r--weed/shell/command_volume_check_disk_test.go6
-rw-r--r--weed/shell/command_volume_server_evacuate.go10
-rw-r--r--weed/shell/common.go8
5 files changed, 277 insertions, 68 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index f059b4e74..f2cc581da 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -26,12 +26,25 @@ type DataCenterId string
type EcNodeId string
type RackId string
+// EcDisk represents a single disk on a volume server
+type EcDisk struct {
+ diskId uint32
+ diskType string
+ freeEcSlots int
+ ecShardCount int // Total EC shards on this disk
+ // Map of volumeId -> shardBits for shards on this disk
+ ecShards map[needle.VolumeId]erasure_coding.ShardBits
+}
+
type EcNode struct {
info *master_pb.DataNodeInfo
dc DataCenterId
rack RackId
freeEcSlot int
+ // disks maps diskId -> EcDisk for disk-level balancing
+ disks map[uint32]*EcDisk
}
+
type CandidateEcNode struct {
ecNode *EcNode
shardCount int
@@ -229,7 +242,7 @@ func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.Vol
return collections
}
-func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
+func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, destDiskId uint32, applyBalancing bool) (err error) {
if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost")
@@ -242,7 +255,7 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
existingServerAddress := pb.NewServerAddressFromDataNode(existingLocation.info)
// ask destination node to copy shard and the ecx file from source node, and mount it
- copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress)
+ copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress, destDiskId)
if err != nil {
return err
}
@@ -259,7 +272,11 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
return err
}
- fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
+ if destDiskId > 0 {
+ fmt.Printf("moved ec shard %d.%d %s => %s (disk %d)\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id, destDiskId)
+ } else {
+ fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
+ }
}
@@ -272,7 +289,7 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
targetServer *EcNode, shardIdsToCopy []uint32,
- volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress) (copiedShardIds []uint32, err error) {
+ volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress, destDiskId uint32) (copiedShardIds []uint32, err error) {
fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
@@ -289,6 +306,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
CopyEcjFile: true,
CopyVifFile: true,
SourceDataNode: string(existingLocation),
+ DiskId: destDiskId,
})
if copyErr != nil {
return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
@@ -410,12 +428,74 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
}
freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
- ecNodes = append(ecNodes, &EcNode{
+ ecNode := &EcNode{
info: dn,
dc: dc,
rack: rack,
freeEcSlot: int(freeEcSlots),
- })
+ disks: make(map[uint32]*EcDisk),
+ }
+
+ // Build disk-level information from volumes and EC shards
+ // First, discover all unique disk IDs from VolumeInfos (includes empty disks)
+ allDiskIds := make(map[uint32]string) // diskId -> diskType
+ for diskType, diskInfo := range dn.DiskInfos {
+ if diskInfo == nil {
+ continue
+ }
+ // Get all disk IDs from volumes
+ for _, vi := range diskInfo.VolumeInfos {
+ allDiskIds[vi.DiskId] = diskType
+ }
+ // Also get disk IDs from EC shards
+ for _, ecShardInfo := range diskInfo.EcShardInfos {
+ allDiskIds[ecShardInfo.DiskId] = diskType
+ }
+ }
+
+ // Group EC shards by disk_id
+ diskShards := make(map[uint32]map[needle.VolumeId]erasure_coding.ShardBits)
+ for _, diskInfo := range dn.DiskInfos {
+ if diskInfo == nil {
+ continue
+ }
+ for _, ecShardInfo := range diskInfo.EcShardInfos {
+ diskId := ecShardInfo.DiskId
+ if diskShards[diskId] == nil {
+ diskShards[diskId] = make(map[needle.VolumeId]erasure_coding.ShardBits)
+ }
+ vid := needle.VolumeId(ecShardInfo.Id)
+ diskShards[diskId][vid] = erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
+ }
+ }
+
+ // Create EcDisk for each discovered disk
+ diskCount := len(allDiskIds)
+ if diskCount == 0 {
+ diskCount = 1
+ }
+ freePerDisk := int(freeEcSlots) / diskCount
+
+ for diskId, diskType := range allDiskIds {
+ shards := diskShards[diskId]
+ if shards == nil {
+ shards = make(map[needle.VolumeId]erasure_coding.ShardBits)
+ }
+ totalShardCount := 0
+ for _, shardBits := range shards {
+ totalShardCount += shardBits.ShardIdCount()
+ }
+
+ ecNode.disks[diskId] = &EcDisk{
+ diskId: diskId,
+ diskType: diskType,
+ freeEcSlots: freePerDisk,
+ ecShardCount: totalShardCount,
+ ecShards: shards,
+ }
+ }
+
+ ecNodes = append(ecNodes, ecNode)
totalFreeEcSlots += freeEcSlots
})
return
@@ -884,10 +964,16 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
for _, shards := range fullDiskInfo.EcShardInfos {
if _, found := emptyNodeIds[shards.Id]; !found {
for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
+ vid := needle.VolumeId(shards.Id)
+ destDiskId := pickBestDiskOnNode(emptyNode, vid)
- fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
+ if destDiskId > 0 {
+ fmt.Printf("%s moves ec shards %d.%d to %s (disk %d)\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id, destDiskId)
+ } else {
+ fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
+ }
- err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, ecb.applyBalancing)
+ err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing)
if err != nil {
return err
}
@@ -957,18 +1043,98 @@ func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existi
if len(targets) == 0 {
return nil, errors.New(details)
}
+
+ // When multiple nodes have the same shard count, prefer nodes with better disk distribution
+ // (i.e., nodes with more disks that have fewer shards of this volume)
+ if len(targets) > 1 {
+ slices.SortFunc(targets, func(a, b *EcNode) int {
+ aScore := diskDistributionScore(a, vid)
+ bScore := diskDistributionScore(b, vid)
+ return aScore - bScore // Lower score is better
+ })
+ return targets[0], nil
+ }
+
return targets[rand.IntN(len(targets))], nil
}
+// diskDistributionScore calculates a score for how well-distributed shards are on the node's disks
+// Lower score is better (means more room for balanced distribution)
+func diskDistributionScore(ecNode *EcNode, vid needle.VolumeId) int {
+ if len(ecNode.disks) == 0 {
+ return 0
+ }
+
+ // Sum the existing shard count for this volume on each disk
+ // Lower total means more room for new shards
+ score := 0
+ for _, disk := range ecNode.disks {
+ if shardBits, ok := disk.ecShards[vid]; ok {
+ score += shardBits.ShardIdCount() * 10 // Weight shards of this volume heavily
+ }
+ score += disk.ecShardCount // Also consider total shards on disk
+ }
+ return score
+}
+
+// pickBestDiskOnNode selects the best disk on a node for placing a new EC shard
+// It prefers disks with fewer shards and more free slots
+func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId) uint32 {
+ if len(ecNode.disks) == 0 {
+ return 0 // No disk info available, let the server decide
+ }
+
+ var bestDiskId uint32
+ bestScore := -1
+
+ for diskId, disk := range ecNode.disks {
+ if disk.freeEcSlots <= 0 {
+ continue
+ }
+
+ // Check if this volume already has shards on this disk
+ existingShards := 0
+ if shardBits, ok := disk.ecShards[vid]; ok {
+ existingShards = shardBits.ShardIdCount()
+ }
+
+ // Score: prefer disks with fewer total shards and fewer shards of this volume
+ // Lower score is better
+ score := disk.ecShardCount*10 + existingShards*100
+
+ if bestScore == -1 || score < bestScore {
+ bestScore = score
+ bestDiskId = diskId
+ }
+ }
+
+ return bestDiskId
+}
+
+// pickEcNodeAndDiskToBalanceShardsInto picks both a destination node and specific disk
+func (ecb *ecBalancer) pickEcNodeAndDiskToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode) (*EcNode, uint32, error) {
+ node, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinations)
+ if err != nil {
+ return nil, 0, err
+ }
+
+ diskId := pickBestDiskOnNode(node, vid)
+ return node, diskId, nil
+}
+
func (ecb *ecBalancer) pickOneEcNodeAndMoveOneShard(existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode) error {
- destNode, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes)
+ destNode, destDiskId, err := ecb.pickEcNodeAndDiskToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes)
if err != nil {
- fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, err.Error())
+ fmt.Printf("WARNING: Could not find suitable target node for %d.%d:\n%s", vid, shardId, err.Error())
return nil
}
- fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id)
- return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, ecb.applyBalancing)
+ if destDiskId > 0 {
+ fmt.Printf("%s moves ec shard %d.%d to %s (disk %d)\n", existingLocation.info.Id, vid, shardId, destNode.info.Id, destDiskId)
+ } else {
+ fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id)
+ }
+ return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, destDiskId, ecb.applyBalancing)
}
func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode {
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go
index d7b015979..4d775000f 100644
--- a/weed/shell/command_volume_check_disk.go
+++ b/weed/shell/command_volume_check_disk.go
@@ -10,6 +10,8 @@ import (
"math"
"math/rand/v2"
"net/http"
+ "strings"
+ "sync"
"time"
"slices"
@@ -32,6 +34,7 @@ type commandVolumeCheckDisk struct{}
type volumeCheckDisk struct {
commandEnv *CommandEnv
writer io.Writer
+ writerMu sync.Mutex
now time.Time
slowMode bool
@@ -40,6 +43,8 @@ type volumeCheckDisk struct {
syncDeletions bool
fixReadOnly bool
nonRepairThreshold float64
+
+ ewg *ErrorWaitGroup
}
func (c *commandVolumeCheckDisk) Name() string {
@@ -59,9 +64,9 @@ func (c *commandVolumeCheckDisk) Help() string {
append entries in B and not in A to A
optionally, for each non-writable volume replica A
- if volume is not full
+ select a writable volume replica B
+ if entries in A don't match B
prune late volume entries not matching its index file
- select a writable volume replica B
append missing entries from B into A
mark the volume as writable (healthy)
@@ -92,6 +97,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
applyChangesAlias := fsckCommand.Bool("force", false, "apply the fix (alias for -apply)")
fixReadOnly := fsckCommand.Bool("fixReadOnly", false, "apply the fix even on readonly volumes (EXPERIMENTAL!)")
syncDeletions := fsckCommand.Bool("syncDeleted", false, "sync of deletions the fix")
+ maxParallelization := fsckCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
nonRepairThreshold := fsckCommand.Float64("nonRepairThreshold", 0.3, "repair when missing keys is not more than this limit")
if err = fsckCommand.Parse(args); err != nil {
return nil
@@ -115,6 +121,8 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
syncDeletions: *syncDeletions,
fixReadOnly: *fixReadOnly,
nonRepairThreshold: *nonRepairThreshold,
+
+ ewg: NewErrorWaitGroup(*maxParallelization),
}
// collect topology information
@@ -137,23 +145,21 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
if err := vcd.checkWritableVolumes(volumeReplicas); err != nil {
return err
}
- if err := vcd.checkReadOnlyVolumes(volumeReplicas); err != nil {
- return err
- }
+ vcd.checkReadOnlyVolumes(volumeReplicas)
- return nil
+ return vcd.ewg.Wait()
}
// checkWritableVolumes fixes volume replicas which are not read-only.
func (vcd *volumeCheckDisk) checkWritableVolumes(volumeReplicas map[uint32][]*VolumeReplica) error {
- vcd.write("Pass #1 (writable volumes)\n")
+ vcd.write("Pass #1 (writable volumes)")
for _, replicas := range volumeReplicas {
// filter readonly replica
var writableReplicas []*VolumeReplica
for _, replica := range replicas {
if replica.info.ReadOnly {
- vcd.write("skipping readonly volume %d on %s\n", replica.info.Id, replica.location.dataNode.Id)
+ vcd.write("skipping readonly volume %d on %s", replica.info.Id, replica.location.dataNode.Id)
} else {
writableReplicas = append(writableReplicas, replica)
}
@@ -166,16 +172,23 @@ func (vcd *volumeCheckDisk) checkWritableVolumes(volumeReplicas map[uint32][]*Vo
a, b := writableReplicas[0], writableReplicas[1]
shouldSkip, err := vcd.shouldSkipVolume(a, b)
if err != nil {
- vcd.write("error checking if volume %d should be skipped: %v\n", a.info.Id, err)
+ vcd.write("error checking if volume %d should be skipped: %v", a.info.Id, err)
// Continue with sync despite error to be safe
} else if shouldSkip {
// always choose the larger volume to be the source
writableReplicas = append(writableReplicas[:1], writableReplicas[2:]...)
continue
}
- if err := vcd.syncTwoReplicas(a, b, true); err != nil {
- vcd.write("sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err)
+
+ modified, err := vcd.syncTwoReplicas(a, b, true)
+ if err != nil {
+ vcd.write("failed to sync volumes %d on %s and %s: %v", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err)
+ } else {
+ if modified {
+ vcd.write("synced %s and %s for volume %d", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id)
+ }
}
+
// always choose the larger volume to be the source
if a.info.FileCount > b.info.FileCount {
writableReplicas = append(writableReplicas[:1], writableReplicas[2:]...)
@@ -204,7 +217,7 @@ func (vcd *volumeCheckDisk) makeVolumeWritable(vid uint32, vr *VolumeReplica) er
return err
}
- vcd.write("volume %d on %s is now writable\n", vid, vr.location.dataNode.Id)
+ vcd.write("volume %d on %s is now writable", vid, vr.location.dataNode.Id)
return nil
}
@@ -224,15 +237,15 @@ func (vcd *volumeCheckDisk) makeVolumeReadonly(vid uint32, vr *VolumeReplica) er
return err
}
- vcd.write("volume %d on %s is now read-only\n", vid, vr.location.dataNode.Id)
+ vcd.write("volume %d on %s is now read-only", vid, vr.location.dataNode.Id)
return nil
}
-func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*VolumeReplica) error {
+func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*VolumeReplica) {
if !vcd.fixReadOnly {
- return nil
+ return
}
- vcd.write("Pass #2 (read-only volumes)\n")
+ vcd.write("Pass #2 (read-only volumes)")
for vid, replicas := range volumeReplicas {
roReplicas := []*VolumeReplica{}
@@ -246,11 +259,11 @@ func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*Vo
}
}
if len(roReplicas) == 0 {
- vcd.write("no read-only replicas for volume %d\n", vid)
+ vcd.write("no read-only replicas for volume %d", vid)
continue
}
if len(rwReplicas) == 0 {
- vcd.write("got %d read-only replicas for volume %d and no writable replicas to fix from\n", len(roReplicas), vid)
+ vcd.write("got %d read-only replicas for volume %d and no writable replicas to fix from", len(roReplicas), vid)
continue
}
@@ -261,35 +274,44 @@ func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*Vo
skip, err := vcd.shouldSkipVolume(r, source)
if err != nil {
- vcd.write("error checking if volume %d should be skipped: %v\n", r.info.Id, err)
+ vcd.ewg.AddErrorf("failed to check if volume %d should be skipped: %v\n", r.info.Id, err)
continue
}
if skip {
continue
}
- // make volume writable...
- if err := vcd.makeVolumeWritable(vid, r); err != nil {
- return err
- }
+ vcd.ewg.Add(func() error {
+ // make volume writable...
+ if err := vcd.makeVolumeWritable(vid, r); err != nil {
+ return err
+ }
- // ...fix it...
- // TODO: test whether syncTwoReplicas() is enough to prune garbage entries on broken volumes.
- if err := vcd.syncTwoReplicas(source, r, false); err != nil {
- vcd.write("sync read-only volume %d on %s from %s: %v\n", vid, r.location.dataNode.Id, source.location.dataNode.Id, err)
+ // ...try to fix it...
+ // TODO: test whether syncTwoReplicas() is enough to prune garbage entries on broken volumes...
+ modified, err := vcd.syncTwoReplicas(source, r, false)
+ if err != nil {
+ vcd.write("sync read-only volume %d on %s from %s: %v", vid, r.location.dataNode.Id, source.location.dataNode.Id, err)
- // ...or revert it back to read-only, if something went wrong.
- if roErr := vcd.makeVolumeReadonly(vid, r); roErr != nil {
- return fmt.Errorf("failed to make volume %d on %s readonly after: %v: %v", vid, r.location.dataNode.Id, err, roErr)
+ if roErr := vcd.makeVolumeReadonly(vid, r); roErr != nil {
+ return fmt.Errorf("failed to revert volume %d on %s to readonly after: %v: %v", vid, r.location.dataNode.Id, err, roErr)
+ }
+ return err
+ } else {
+ if modified {
+ vcd.write("volume %d on %s is now synced to %d and writable", vid, r.location.dataNode.Id, source.location.dataNode.Id)
+ } else {
+ // ...or restore back to read-only, if no changes were made.
+ if err := vcd.makeVolumeReadonly(vid, r); err != nil {
+ return fmt.Errorf("failed to revert volume %d on %s to readonly: %v", vid, r.location.dataNode.Id, err)
+ }
+ }
}
- vcd.write("volume %d on %s is now read-only\n", vid, r.location.dataNode.Id)
- return err
- }
+ return nil
+ })
}
}
-
- return nil
}
func (vcd *volumeCheckDisk) grpcDialOption() grpc.DialOption {
@@ -297,12 +319,15 @@ func (vcd *volumeCheckDisk) grpcDialOption() grpc.DialOption {
}
func (vcd *volumeCheckDisk) write(format string, a ...any) {
- fmt.Fprintf(vcd.writer, format, a...)
+ vcd.writerMu.Lock()
+ defer vcd.writerMu.Unlock()
+ fmt.Fprintf(vcd.writer, strings.TrimRight(format, "\r\n "), a...)
+ fmt.Fprint(vcd.writer, "\n")
}
func (vcd *volumeCheckDisk) writeVerbose(format string, a ...any) {
if vcd.verbose {
- fmt.Fprintf(vcd.writer, format, a...)
+ vcd.write(format, a...)
}
}
@@ -388,7 +413,7 @@ func (vcd *volumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica) (bool, error)
if doSyncDeletedCount && !eqDeletedFileCount {
return false, nil
}
- vcd.writeVerbose("skipping active volumes %d with the same file counts on %s and %s\n",
+ vcd.writeVerbose("skipping active volumes %d with the same file counts on %s and %s",
a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id)
} else {
return false, nil
@@ -399,35 +424,39 @@ func (vcd *volumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica) (bool, error)
// syncTwoReplicas attempts to sync all entries from a source volume replica into a target. If bi-directional mode
// is enabled, changes from target are also synced back into the source.
-func (vcd *volumeCheckDisk) syncTwoReplicas(source, target *VolumeReplica, bidi bool) (err error) {
+// Returns true if source and/or target were modified, false otherwise.
+func (vcd *volumeCheckDisk) syncTwoReplicas(source, target *VolumeReplica, bidi bool) (modified bool, err error) {
sourceHasChanges, targetHasChanges := true, true
const maxIterations = 5
iteration := 0
+ modified = false
+
for (sourceHasChanges || targetHasChanges) && iteration < maxIterations {
iteration++
- vcd.writeVerbose("sync iteration %d/%d for volume %d\n", iteration, maxIterations, source.info.Id)
+ vcd.writeVerbose("sync iteration %d/%d for volume %d", iteration, maxIterations, source.info.Id)
prevSourceHasChanges, prevTargetHasChanges := sourceHasChanges, targetHasChanges
if sourceHasChanges, targetHasChanges, err = vcd.checkBoth(source, target, bidi); err != nil {
- return err
+ return modified, err
}
+ modified = modified || sourceHasChanges || targetHasChanges
// Detect if we're stuck in a loop with no progress
if iteration > 1 && prevSourceHasChanges == sourceHasChanges && prevTargetHasChanges == targetHasChanges && (sourceHasChanges || targetHasChanges) {
- vcd.write("volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop\n",
+ vcd.write("volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop",
source.info.Id, source.location.dataNode.Id, target.location.dataNode.Id, iteration)
- return fmt.Errorf("sync not making progress after %d iterations", iteration)
+ return modified, fmt.Errorf("sync not making progress after %d iterations", iteration)
}
}
if iteration >= maxIterations && (sourceHasChanges || targetHasChanges) {
- vcd.write("volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention\n",
+ vcd.write("volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention",
source.info.Id, maxIterations, source.location.dataNode.Id, target.location.dataNode.Id)
- return fmt.Errorf("reached maximum sync iterations (%d)", maxIterations)
+ return modified, fmt.Errorf("reached maximum sync iterations (%d)", maxIterations)
}
- return nil
+ return modified, nil
}
// checkBoth performs a sync between source and target volume replicas. If bi-directional mode is enabled, changes from target are also synced back into the source.
@@ -512,7 +541,7 @@ func (vcd *volumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.Me
return nil
})
- vcd.write("volume %d %s has %d entries, %s missed %d and partially deleted %d entries\n",
+ vcd.write("volume %d %s has %d entries, %s missed %d and partially deleted %d entries",
source.info.Id, source.location.dataNode.Id, counter, target.location.dataNode.Id, len(missingNeedles), len(partiallyDeletedNeedles))
if counter == 0 || (len(missingNeedles) == 0 && len(partiallyDeletedNeedles) == 0) {
@@ -536,7 +565,7 @@ func (vcd *volumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.Me
continue
}
- vcd.writeVerbose("read %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id)
+ vcd.writeVerbose("read %s %s => %s", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id)
hasChanges = true
if err = vcd.writeNeedleBlobToTarget(pb.NewServerAddressFromDataNode(target.location.dataNode), source.info.Id, needleValue, needleBlob); err != nil {
@@ -549,7 +578,7 @@ func (vcd *volumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.Me
var fidList []string
for _, needleValue := range partiallyDeletedNeedles {
fidList = append(fidList, needleValue.Key.FileId(source.info.Id))
- vcd.writeVerbose("delete %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id)
+ vcd.writeVerbose("delete %s %s => %s", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id)
}
deleteResults := operation.DeleteFileIdsAtOneVolumeServer(
pb.NewServerAddressFromDataNode(target.location.dataNode),
@@ -604,7 +633,7 @@ func (vcd *volumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collection s
return err
}
- vcd.writeVerbose("load collection %s volume %d index size %d from %s ...\n", collection, volumeId, buf.Len(), volumeServer)
+ vcd.writeVerbose("load collection %s volume %d index size %d from %s ...", collection, volumeId, buf.Len(), volumeServer)
return db.LoadFilterFromReaderAt(bytes.NewReader(buf.Bytes()), true, false)
}
@@ -616,7 +645,7 @@ func (vcd *volumeCheckDisk) copyVolumeIndexFile(collection string, volumeId uint
copyFileClient, err := volumeServerClient.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
VolumeId: volumeId,
- Ext: ".idx",
+ Ext: ext,
CompactionRevision: math.MaxUint32,
StopOffset: math.MaxInt64,
Collection: collection,
diff --git a/weed/shell/command_volume_check_disk_test.go b/weed/shell/command_volume_check_disk_test.go
index eee9103a8..ec958fbc4 100644
--- a/weed/shell/command_volume_check_disk_test.go
+++ b/weed/shell/command_volume_check_disk_test.go
@@ -278,14 +278,14 @@ func TestVolumeCheckDiskHelperMethods(t *testing.T) {
}
// Test write method
- vcd.write("test %s\n", "message")
+ vcd.write("test %s", "message")
if buf.String() != "test message\n" {
t.Errorf("write() output = %q, want %q", buf.String(), "test message\n")
}
// Test writeVerbose with verbose=true
buf.Reset()
- vcd.writeVerbose("verbose %d\n", 123)
+ vcd.writeVerbose("verbose %d", 123)
if buf.String() != "verbose 123\n" {
t.Errorf("writeVerbose() with verbose=true output = %q, want %q", buf.String(), "verbose 123\n")
}
@@ -293,7 +293,7 @@ func TestVolumeCheckDiskHelperMethods(t *testing.T) {
// Test writeVerbose with verbose=false
buf.Reset()
vcd.verbose = false
- vcd.writeVerbose("should not appear\n")
+ vcd.writeVerbose("should not appear")
if buf.String() != "" {
t.Errorf("writeVerbose() with verbose=false output = %q, want empty", buf.String())
}
diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go
index 5c1805c89..6135eb3eb 100644
--- a/weed/shell/command_volume_server_evacuate.go
+++ b/weed/shell/command_volume_server_evacuate.go
@@ -197,8 +197,14 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv
if ecShardInfo.Collection != "" {
collectionPrefix = ecShardInfo.Collection + "_"
}
- fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id)
- err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, needle.VolumeId(ecShardInfo.Id), shardId, emptyNode, applyChange)
+ vid := needle.VolumeId(ecShardInfo.Id)
+ destDiskId := pickBestDiskOnNode(emptyNode, vid)
+ if destDiskId > 0 {
+ fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s (disk %d)\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id, destDiskId)
+ } else {
+ fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id)
+ }
+ err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange)
if err != nil {
return
} else {
diff --git a/weed/shell/common.go b/weed/shell/common.go
index 43571176e..cb2df5828 100644
--- a/weed/shell/common.go
+++ b/weed/shell/common.go
@@ -2,6 +2,7 @@ package shell
import (
"errors"
+ "fmt"
"sync"
)
@@ -64,6 +65,13 @@ func (ewg *ErrorWaitGroup) Add(f ErrorWaitGroupTask) {
}()
}
+// AddErrorf adds an error to an ErrorWaitGroupTask result, without queueing any goroutines.
+func (ewg *ErrorWaitGroup) AddErrorf(format string, a ...interface{}) {
+ ewg.errorsMu.Lock()
+ ewg.errors = append(ewg.errors, fmt.Errorf(format, a...))
+ ewg.errorsMu.Unlock()
+}
+
// Wait sleeps until all ErrorWaitGroupTasks are completed, then returns errors for them.
func (ewg *ErrorWaitGroup) Wait() error {
ewg.wg.Wait()