aboutsummaryrefslogtreecommitdiff
path: root/weed/shell
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell')
-rw-r--r--weed/shell/command_bucket_delete.go2
-rw-r--r--weed/shell/command_ec_balance.go2
-rw-r--r--weed/shell/command_ec_common.go27
-rw-r--r--weed/shell/command_fs_meta_cat.go6
-rw-r--r--weed/shell/command_volume_balance.go238
-rw-r--r--weed/shell/command_volume_balance_test.go155
-rw-r--r--weed/shell/command_volume_fix_replication.go191
-rw-r--r--weed/shell/command_volume_fix_replication_test.go176
-rw-r--r--weed/shell/command_volume_server_evacuate.go208
-rw-r--r--weed/shell/command_volume_server_leave.go67
-rw-r--r--weed/shell/shell_liner.go2
11 files changed, 878 insertions, 196 deletions
diff --git a/weed/shell/command_bucket_delete.go b/weed/shell/command_bucket_delete.go
index 03c878e6a..02790b9e2 100644
--- a/weed/shell/command_bucket_delete.go
+++ b/weed/shell/command_bucket_delete.go
@@ -49,6 +49,6 @@ func (c *commandBucketDelete) Do(args []string, commandEnv *CommandEnv, writer i
return fmt.Errorf("read buckets: %v", err)
}
- return filer_pb.Remove(commandEnv, filerBucketsPath, *bucketName, false, true, true, false, 0)
+ return filer_pb.Remove(commandEnv, filerBucketsPath, *bucketName, false, true, true, false, nil)
}
diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go
index 1ddb6a490..bb280b7d9 100644
--- a/weed/shell/command_ec_balance.go
+++ b/weed/shell/command_ec_balance.go
@@ -28,7 +28,7 @@ func (c *commandEcBalance) Help() string {
Algorithm:
- For each type of volume server (different max volume count limit){
+ func EcBalance() {
for each collection:
balanceEcVolumes(collectionName)
for each rack:
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index 0db119d3c..a808335eb 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -173,6 +173,16 @@ type EcNode struct {
freeEcSlot int
}
+func (ecNode *EcNode) localShardIdCount(vid uint32) int {
+ for _, ecShardInfo := range ecNode.info.EcShardInfos {
+ if vid == ecShardInfo.Id {
+ shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
+ return shardBits.ShardIdCount()
+ }
+ }
+ return 0
+}
+
type EcRack struct {
ecNodes map[EcNodeId]*EcNode
freeEcSlot int
@@ -191,7 +201,15 @@ func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes
}
// find out all volume servers with one slot left.
- eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
+ ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(resp.TopologyInfo, selectedDataCenter)
+
+ sortEcNodesByFreeslotsDecending(ecNodes)
+
+ return
+}
+
+func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) {
+ eachDataNode(topo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
if selectedDataCenter != "" && selectedDataCenter != dc {
return
}
@@ -205,9 +223,6 @@ func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes
})
totalFreeEcSlots += freeEcSlots
})
-
- sortEcNodesByFreeslotsDecending(ecNodes)
-
return
}
@@ -253,6 +268,10 @@ func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId n
})
}
+func divide(total, n int) float64 {
+ return float64(total) / float64(n)
+}
+
func ceilDivide(total, n int) int {
return int(math.Ceil(float64(total) / float64(n)))
}
diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go
index 8cba2d520..a097a3a4e 100644
--- a/weed/shell/command_fs_meta_cat.go
+++ b/weed/shell/command_fs_meta_cat.go
@@ -2,6 +2,7 @@ package shell
import (
"fmt"
+ "github.com/golang/protobuf/proto"
"io"
"sort"
@@ -69,6 +70,11 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W
fmt.Fprintf(writer, "%s\n", text)
+ bytes, _ := proto.Marshal(respLookupEntry.Entry)
+ gzippedBytes, _ := util.GzipData(bytes)
+ zstdBytes, _ := util.ZstdData(bytes)
+ fmt.Fprintf(writer, "chunks %d meta size: %d gzip:%d zstd:%d\n", len(respLookupEntry.Entry.Chunks), len(bytes), len(gzippedBytes), len(zstdBytes))
+
return nil
})
diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go
index 69e3c7fd9..53222ca29 100644
--- a/weed/shell/command_volume_balance.go
+++ b/weed/shell/command_volume_balance.go
@@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"io"
"os"
"sort"
@@ -39,14 +40,15 @@ func (c *commandVolumeBalance) Help() string {
}
func balanceWritableVolumes(){
- idealWritableVolumes = totalWritableVolumes / numVolumeServers
+ idealWritableVolumeRatio = totalWritableVolumes / totalNumberOfMaxVolumes
for hasMovedOneVolume {
- sort all volume servers ordered by the number of local writable volumes
- pick the volume server A with the lowest number of writable volumes x
- pick the volume server B with the highest number of writable volumes y
- if y > idealWritableVolumes and x +1 <= idealWritableVolumes {
- if B has a writable volume id v that A does not have {
- move writable volume v from A to B
+ sort all volume servers ordered by the localWritableVolumeRatio = localWritableVolumes to localVolumeMax
+ pick the volume server B with the highest localWritableVolumeRatio y
+ for any the volume server A with the number of writable volumes x + 1 <= idealWritableVolumeRatio * localVolumeMax {
+ if y > localWritableVolumeRatio {
+ if B has a writable volume id v that A does not have, and satisfy v replication requirements {
+ move writable volume v from A to B
+ }
}
}
}
@@ -81,38 +83,33 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
return err
}
- typeToNodes := collectVolumeServersByType(resp.TopologyInfo, *dc)
+ volumeServers := collectVolumeServersByDc(resp.TopologyInfo, *dc)
+ volumeReplicas, _ := collectVolumeReplicaLocations(resp)
- for maxVolumeCount, volumeServers := range typeToNodes {
- if len(volumeServers) < 2 {
- fmt.Printf("only 1 node is configured max %d volumes, skipping balancing\n", maxVolumeCount)
- continue
+ if *collection == "EACH_COLLECTION" {
+ collections, err := ListCollectionNames(commandEnv, true, false)
+ if err != nil {
+ return err
}
- if *collection == "EACH_COLLECTION" {
- collections, err := ListCollectionNames(commandEnv, true, false)
- if err != nil {
- return err
- }
- for _, c := range collections {
- if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil {
- return err
- }
- }
- } else if *collection == "ALL_COLLECTIONS" {
- if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil {
- return err
- }
- } else {
- if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil {
+ for _, c := range collections {
+ if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil {
return err
}
}
-
+ } else if *collection == "ALL_COLLECTIONS" {
+ if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil {
+ return err
+ }
+ } else {
+ if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil {
+ return err
+ }
}
+
return nil
}
-func balanceVolumeServers(commandEnv *CommandEnv, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error {
+func balanceVolumeServers(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error {
// balance writable volumes
for _, n := range nodes {
@@ -125,7 +122,7 @@ func balanceVolumeServers(commandEnv *CommandEnv, nodes []*Node, volumeSizeLimit
return !v.ReadOnly && v.Size < volumeSizeLimit
})
}
- if err := balanceSelectedVolume(commandEnv, nodes, sortWritableVolumes, applyBalancing); err != nil {
+ if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, sortWritableVolumes, applyBalancing); err != nil {
return err
}
@@ -140,22 +137,21 @@ func balanceVolumeServers(commandEnv *CommandEnv, nodes []*Node, volumeSizeLimit
return v.ReadOnly || v.Size >= volumeSizeLimit
})
}
- if err := balanceSelectedVolume(commandEnv, nodes, sortReadOnlyVolumes, applyBalancing); err != nil {
+ if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, sortReadOnlyVolumes, applyBalancing); err != nil {
return err
}
return nil
}
-func collectVolumeServersByType(t *master_pb.TopologyInfo, selectedDataCenter string) (typeToNodes map[uint64][]*Node) {
- typeToNodes = make(map[uint64][]*Node)
+func collectVolumeServersByDc(t *master_pb.TopologyInfo, selectedDataCenter string) (nodes []*Node) {
for _, dc := range t.DataCenterInfos {
if selectedDataCenter != "" && dc.Id != selectedDataCenter {
continue
}
for _, r := range dc.RackInfos {
for _, dn := range r.DataNodeInfos {
- typeToNodes[dn.MaxVolumeCount] = append(typeToNodes[dn.MaxVolumeCount], &Node{
+ nodes = append(nodes, &Node{
info: dn,
dc: dc.Id,
rack: r.Id,
@@ -173,6 +169,23 @@ type Node struct {
rack string
}
+func (n *Node) localVolumeRatio() float64 {
+ return divide(len(n.selectedVolumes), int(n.info.MaxVolumeCount))
+}
+
+func (n *Node) localVolumeNextRatio() float64 {
+ return divide(len(n.selectedVolumes)+1, int(n.info.MaxVolumeCount))
+}
+
+func (n *Node) selectVolumes(fn func(v *master_pb.VolumeInformationMessage) bool) {
+ n.selectedVolumes = make(map[uint32]*master_pb.VolumeInformationMessage)
+ for _, v := range n.info.VolumeInfos {
+ if fn(v) {
+ n.selectedVolumes[v.Id] = v
+ }
+ }
+}
+
func sortWritableVolumes(volumes []*master_pb.VolumeInformationMessage) {
sort.Slice(volumes, func(i, j int) bool {
return volumes[i].Size < volumes[j].Size
@@ -185,73 +198,146 @@ func sortReadOnlyVolumes(volumes []*master_pb.VolumeInformationMessage) {
})
}
-func balanceSelectedVolume(commandEnv *CommandEnv, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) error {
- selectedVolumeCount := 0
+func balanceSelectedVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) (err error) {
+ selectedVolumeCount, volumeMaxCount := 0, 0
for _, dn := range nodes {
selectedVolumeCount += len(dn.selectedVolumes)
+ volumeMaxCount += int(dn.info.MaxVolumeCount)
}
- idealSelectedVolumes := ceilDivide(selectedVolumeCount, len(nodes))
+ idealVolumeRatio := divide(selectedVolumeCount, volumeMaxCount)
- hasMove := true
+ hasMoved := true
- for hasMove {
- hasMove = false
+ for hasMoved {
+ hasMoved = false
sort.Slice(nodes, func(i, j int) bool {
- // TODO sort by free volume slots???
- return len(nodes[i].selectedVolumes) < len(nodes[j].selectedVolumes)
+ return nodes[i].localVolumeRatio() < nodes[j].localVolumeRatio()
})
- emptyNode, fullNode := nodes[0], nodes[len(nodes)-1]
- if len(fullNode.selectedVolumes) > idealSelectedVolumes && len(emptyNode.selectedVolumes)+1 <= idealSelectedVolumes {
- // sort the volumes to move
- var candidateVolumes []*master_pb.VolumeInformationMessage
- for _, v := range fullNode.selectedVolumes {
- candidateVolumes = append(candidateVolumes, v)
+ fullNode := nodes[len(nodes)-1]
+ var candidateVolumes []*master_pb.VolumeInformationMessage
+ for _, v := range fullNode.selectedVolumes {
+ candidateVolumes = append(candidateVolumes, v)
+ }
+ sortCandidatesFn(candidateVolumes)
+
+ for i := 0; i < len(nodes)-1; i++ {
+ emptyNode := nodes[i]
+ if !(fullNode.localVolumeRatio() > idealVolumeRatio && emptyNode.localVolumeNextRatio() <= idealVolumeRatio) {
+ // no more volume servers with empty slots
+ break
}
- sortCandidatesFn(candidateVolumes)
-
- for _, v := range candidateVolumes {
- if v.ReplicaPlacement > 0 {
- if fullNode.dc != emptyNode.dc && fullNode.rack != emptyNode.rack {
- // TODO this logic is too simple, but should work most of the time
- // Need a correct algorithm to handle all different cases
- continue
- }
- }
- if _, found := emptyNode.selectedVolumes[v.Id]; !found {
- if err := moveVolume(commandEnv, v, fullNode, emptyNode, applyBalancing); err == nil {
- delete(fullNode.selectedVolumes, v.Id)
- emptyNode.selectedVolumes[v.Id] = v
- hasMove = true
- break
- } else {
- return err
- }
- }
+ hasMoved, err = attemptToMoveOneVolume(commandEnv, volumeReplicas, fullNode, candidateVolumes, emptyNode, applyBalancing)
+ if err != nil {
+ return
+ }
+ if hasMoved {
+ // moved one volume
+ break
}
}
}
return nil
}
-func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, fullNode *Node, emptyNode *Node, applyBalancing bool) error {
+func attemptToMoveOneVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, fullNode *Node, candidateVolumes []*master_pb.VolumeInformationMessage, emptyNode *Node, applyBalancing bool) (hasMoved bool, err error) {
+
+ for _, v := range candidateVolumes {
+ hasMoved, err = maybeMoveOneVolume(commandEnv, volumeReplicas, fullNode, v, emptyNode, applyBalancing)
+ if err != nil {
+ return
+ }
+ if hasMoved {
+ break
+ }
+ }
+ return
+}
+
+func maybeMoveOneVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, fullNode *Node, candidateVolume *master_pb.VolumeInformationMessage, emptyNode *Node, applyChange bool) (hasMoved bool, err error) {
+
+ if candidateVolume.ReplicaPlacement > 0 {
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(candidateVolume.ReplicaPlacement))
+ if !isGoodMove(replicaPlacement, volumeReplicas[candidateVolume.Id], fullNode, emptyNode) {
+ return false, nil
+ }
+ }
+ if _, found := emptyNode.selectedVolumes[candidateVolume.Id]; !found {
+ if err = moveVolume(commandEnv, candidateVolume, fullNode, emptyNode, applyChange); err == nil {
+ adjustAfterMove(candidateVolume, volumeReplicas, fullNode, emptyNode)
+ return true, nil
+ } else {
+ return
+ }
+ }
+ return
+}
+
+func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, fullNode *Node, emptyNode *Node, applyChange bool) error {
collectionPrefix := v.Collection + "_"
if v.Collection == "" {
collectionPrefix = ""
}
fmt.Fprintf(os.Stdout, "moving volume %s%d %s => %s\n", collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id)
- if applyBalancing {
+ if applyChange {
return LiveMoveVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second)
}
return nil
}
-func (node *Node) selectVolumes(fn func(v *master_pb.VolumeInformationMessage) bool) {
- node.selectedVolumes = make(map[uint32]*master_pb.VolumeInformationMessage)
- for _, v := range node.info.VolumeInfos {
- if fn(v) {
- node.selectedVolumes[v.Id] = v
+func isGoodMove(placement *super_block.ReplicaPlacement, existingReplicas []*VolumeReplica, sourceNode, targetNode *Node) bool {
+ for _, replica := range existingReplicas {
+ if replica.location.dataNode.Id == targetNode.info.Id &&
+ replica.location.rack == targetNode.rack &&
+ replica.location.dc == targetNode.dc {
+ // never move to existing nodes
+ return false
+ }
+ }
+ dcs, racks := make(map[string]bool), make(map[string]int)
+ for _, replica := range existingReplicas {
+ if replica.location.dataNode.Id != sourceNode.info.Id {
+ dcs[replica.location.DataCenter()] = true
+ racks[replica.location.Rack()]++
+ }
+ }
+
+ dcs[targetNode.dc] = true
+ racks[fmt.Sprintf("%s %s", targetNode.dc, targetNode.rack)]++
+
+ if len(dcs) > placement.DiffDataCenterCount+1 {
+ return false
+ }
+
+ if len(racks) > placement.DiffRackCount+placement.DiffDataCenterCount+1 {
+ return false
+ }
+
+ for _, sameRackCount := range racks {
+ if sameRackCount > placement.SameRackCount+1 {
+ return false
+ }
+ }
+
+ return true
+
+}
+
+func adjustAfterMove(v *master_pb.VolumeInformationMessage, volumeReplicas map[uint32][]*VolumeReplica, fullNode *Node, emptyNode *Node) {
+ delete(fullNode.selectedVolumes, v.Id)
+ if emptyNode.selectedVolumes != nil {
+ emptyNode.selectedVolumes[v.Id] = v
+ }
+ existingReplicas := volumeReplicas[v.Id]
+ for _, replica := range existingReplicas {
+ if replica.location.dataNode.Id == fullNode.info.Id &&
+ replica.location.rack == fullNode.rack &&
+ replica.location.dc == fullNode.dc {
+ replica.location.dc = emptyNode.dc
+ replica.location.rack = emptyNode.rack
+ replica.location.dataNode = emptyNode.info
+ return
}
}
}
diff --git a/weed/shell/command_volume_balance_test.go b/weed/shell/command_volume_balance_test.go
new file mode 100644
index 000000000..9e154dc00
--- /dev/null
+++ b/weed/shell/command_volume_balance_test.go
@@ -0,0 +1,155 @@
+package shell
+
+import (
+ "testing"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+)
+
+type testMoveCase struct {
+ name string
+ replication string
+ replicas []*VolumeReplica
+ sourceLocation location
+ targetLocation location
+ expected bool
+}
+
+func TestIsGoodMove(t *testing.T) {
+
+ var tests = []testMoveCase{
+
+ {
+ name: "test 100 move to spread into proper data centers",
+ replication: "100",
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ },
+ sourceLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ targetLocation: location{"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn3"}},
+ expected: true,
+ },
+
+ {
+ name: "test move to the same node",
+ replication: "001",
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ },
+ sourceLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ targetLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ expected: false,
+ },
+
+ {
+ name: "test move to the same rack, but existing node",
+ replication: "001",
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ },
+ sourceLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ targetLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ expected: false,
+ },
+
+ {
+ name: "test move to the same rack, a new node",
+ replication: "001",
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ },
+ sourceLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ targetLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}},
+ expected: true,
+ },
+
+ {
+ name: "test 010 move all to the same rack",
+ replication: "010",
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ },
+ sourceLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ targetLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}},
+ expected: false,
+ },
+
+ {
+ name: "test 010 move to spread racks",
+ replication: "010",
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ },
+ sourceLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ targetLocation: location{"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ expected: true,
+ },
+
+ {
+ name: "test 010 move to spread racks",
+ replication: "010",
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ },
+ sourceLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ targetLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn3"}},
+ expected: true,
+ },
+ }
+
+ for _, tt := range tests {
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromString(tt.replication)
+ println("replication:", tt.replication, "expected", tt.expected, "name:", tt.name)
+ sourceNode := &Node{
+ info: tt.sourceLocation.dataNode,
+ dc: tt.sourceLocation.dc,
+ rack: tt.sourceLocation.rack,
+ }
+ targetNode := &Node{
+ info: tt.targetLocation.dataNode,
+ dc: tt.targetLocation.dc,
+ rack: tt.targetLocation.rack,
+ }
+ if isGoodMove(replicaPlacement, tt.replicas, sourceNode, targetNode) != tt.expected {
+ t.Errorf("%s: expect %v move from %v to %s, replication:%v",
+ tt.name, tt.expected, tt.sourceLocation, tt.targetLocation, tt.replication)
+ }
+ }
+
+}
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index e17f35c67..b32ccaaab 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -3,8 +3,8 @@ package shell
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"io"
- "math/rand"
"sort"
"github.com/chrislusf/seaweedfs/weed/operation"
@@ -27,16 +27,18 @@ func (c *commandVolumeFixReplication) Name() string {
func (c *commandVolumeFixReplication) Help() string {
return `add replicas to volumes that are missing replicas
- This command finds all under-replicated volumes, and finds volume servers with free slots.
+ This command finds all over-replicated volumes. If found, it will purge the oldest copies and stop.
+
+ This command also finds all under-replicated volumes, and finds volume servers with free slots.
If the free slots satisfy the replication requirement, the volume content is copied over and mounted.
volume.fix.replication -n # do not take action
- volume.fix.replication # actually copying the volume files and mount the volume
+ volume.fix.replication # actually deleting or copying the volume files and mount the volume
Note:
* each time this will only add back one replica for one volume id. If there are multiple replicas
are missing, e.g. multiple volume servers are new, you may need to run this multiple times.
- * do not run this too quick within seconds, since the new volume replica may take a few seconds
+ * do not run this too quickly within seconds, since the new volume replica may take a few seconds
to register itself to the master.
`
@@ -64,53 +66,89 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
// find all volumes that needs replication
// collect all data nodes
- replicatedVolumeLocations := make(map[uint32][]location)
- replicatedVolumeInfo := make(map[uint32]*master_pb.VolumeInformationMessage)
+ volumeReplicas, allLocations := collectVolumeReplicaLocations(resp)
+
+ if len(allLocations) == 0 {
+ return fmt.Errorf("no data nodes at all")
+ }
+
+ // find all under replicated volumes
+ var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32
+ for vid, replicas := range volumeReplicas {
+ replica := replicas[0]
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
+ if replicaPlacement.GetCopyCount() > len(replicas) {
+ underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
+ } else if replicaPlacement.GetCopyCount() < len(replicas) {
+ overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid)
+ fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
+ }
+ }
+
+ if len(overReplicatedVolumeIds) > 0 {
+ return c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations)
+ }
+
+ if len(underReplicatedVolumeIds) == 0 {
+ return nil
+ }
+
+ // find the most under populated data nodes
+ keepDataNodesSorted(allLocations)
+
+ return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations)
+
+}
+
+func collectVolumeReplicaLocations(resp *master_pb.VolumeListResponse) (map[uint32][]*VolumeReplica, []location) {
+ volumeReplicas := make(map[uint32][]*VolumeReplica)
var allLocations []location
eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
loc := newLocation(dc, string(rack), dn)
for _, v := range dn.VolumeInfos {
- if v.ReplicaPlacement > 0 {
- replicatedVolumeLocations[v.Id] = append(replicatedVolumeLocations[v.Id], loc)
- replicatedVolumeInfo[v.Id] = v
- }
+ volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{
+ location: &loc,
+ info: v,
+ })
}
allLocations = append(allLocations, loc)
})
+ return volumeReplicas, allLocations
+}
- // find all under replicated volumes
- underReplicatedVolumeLocations := make(map[uint32][]location)
- for vid, locations := range replicatedVolumeLocations {
- volumeInfo := replicatedVolumeInfo[vid]
- replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
- if replicaPlacement.GetCopyCount() > len(locations) {
- underReplicatedVolumeLocations[vid] = locations
+func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, overReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
+ for _, vid := range overReplicatedVolumeIds {
+ replicas := volumeReplicas[vid]
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replicas[0].info.ReplicaPlacement))
+
+ replica := pickOneReplicaToDelete(replicas, replicaPlacement)
+
+ fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id)
+
+ if !takeAction {
+ break
}
- }
- if len(underReplicatedVolumeLocations) == 0 {
- return fmt.Errorf("no under replicated volumes")
- }
+ if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), replica.location.dataNode.Id); err != nil {
+ return fmt.Errorf("deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err)
+ }
- if len(allLocations) == 0 {
- return fmt.Errorf("no data nodes at all")
}
+ return nil
+}
- // find the most under populated data nodes
- keepDataNodesSorted(allLocations)
-
- for vid, locations := range underReplicatedVolumeLocations {
- volumeInfo := replicatedVolumeInfo[vid]
- replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
+func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
+ for _, vid := range underReplicatedVolumeIds {
+ replicas := volumeReplicas[vid]
+ replica := pickOneReplicaToCopyFrom(replicas)
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
foundNewLocation := false
for _, dst := range allLocations {
// check whether data nodes satisfy the constraints
- if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, locations, dst) {
+ if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
// ask the volume server to replicate the volume
- sourceNodes := underReplicatedVolumeLocations[vid]
- sourceNode := sourceNodes[rand.Intn(len(sourceNodes))]
foundNewLocation = true
- fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", volumeInfo.Id, replicaPlacement, sourceNode.dataNode.Id, dst.dataNode.Id)
+ fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)
if !takeAction {
break
@@ -118,11 +156,11 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
- VolumeId: volumeInfo.Id,
- SourceDataNode: sourceNode.dataNode.Id,
+ VolumeId: replica.info.Id,
+ SourceDataNode: replica.location.dataNode.Id,
})
if replicateErr != nil {
- return fmt.Errorf("copying from %s => %s : %v", sourceNode.dataNode.Id, dst.dataNode.Id, replicateErr)
+ return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
}
return nil
})
@@ -138,11 +176,10 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
}
}
if !foundNewLocation {
- fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", volumeInfo.Id, replicaPlacement, locations)
+ fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas))
}
}
-
return nil
}
@@ -182,22 +219,15 @@ func keepDataNodesSorted(dataNodes []location) {
return false
}
*/
-func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, existingLocations []location, possibleLocation location) bool {
+func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica, possibleLocation location) bool {
- existingDataNodes := make(map[string]int)
- for _, loc := range existingLocations {
- existingDataNodes[loc.String()] += 1
- }
- sameDataNodeCount := existingDataNodes[possibleLocation.String()]
- // avoid duplicated volume on the same data node
- if sameDataNodeCount > 0 {
+ existingDataCenters, _, existingDataNodes := countReplicas(replicas)
+
+ if _, found := existingDataNodes[possibleLocation.String()]; found {
+ // avoid duplicated volume on the same data node
return false
}
- existingDataCenters := make(map[string]int)
- for _, loc := range existingLocations {
- existingDataCenters[loc.DataCenter()] += 1
- }
primaryDataCenters, _ := findTopKeys(existingDataCenters)
// ensure data center count is within limit
@@ -218,20 +248,20 @@ func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, exi
}
// now this is one of the primary dcs
- existingRacks := make(map[string]int)
- for _, loc := range existingLocations {
- if loc.DataCenter() != possibleLocation.DataCenter() {
+ primaryDcRacks := make(map[string]int)
+ for _, replica := range replicas {
+ if replica.location.DataCenter() != possibleLocation.DataCenter() {
continue
}
- existingRacks[loc.Rack()] += 1
+ primaryDcRacks[replica.location.Rack()] += 1
}
- primaryRacks, _ := findTopKeys(existingRacks)
- sameRackCount := existingRacks[possibleLocation.Rack()]
+ primaryRacks, _ := findTopKeys(primaryDcRacks)
+ sameRackCount := primaryDcRacks[possibleLocation.Rack()]
// ensure rack count is within limit
- if _, found := existingRacks[possibleLocation.Rack()]; !found {
+ if _, found := primaryDcRacks[possibleLocation.Rack()]; !found {
// different from existing racks
- if len(existingRacks) < replicaPlacement.DiffRackCount+1 {
+ if len(primaryDcRacks) < replicaPlacement.DiffRackCount+1 {
// lack on different racks
return true
} else {
@@ -280,6 +310,11 @@ func isAmong(key string, keys []string) bool {
return false
}
+type VolumeReplica struct {
+ location *location
+ info *master_pb.VolumeInformationMessage
+}
+
type location struct {
dc string
rack string
@@ -305,3 +340,43 @@ func (l location) Rack() string {
func (l location) DataCenter() string {
return l.dc
}
+
+func pickOneReplicaToCopyFrom(replicas []*VolumeReplica) *VolumeReplica {
+ mostRecent := replicas[0]
+ for _, replica := range replicas {
+ if replica.info.ModifiedAtSecond > mostRecent.info.ModifiedAtSecond {
+ mostRecent = replica
+ }
+ }
+ return mostRecent
+}
+
+func countReplicas(replicas []*VolumeReplica) (diffDc, diffRack, diffNode map[string]int) {
+ diffDc = make(map[string]int)
+ diffRack = make(map[string]int)
+ diffNode = make(map[string]int)
+ for _, replica := range replicas {
+ diffDc[replica.location.DataCenter()] += 1
+ diffRack[replica.location.Rack()] += 1
+ diffNode[replica.location.String()] += 1
+ }
+ return
+}
+
+func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica {
+
+ allSame := true
+ oldest := replicas[0]
+ for _, replica := range replicas {
+ if replica.info.ModifiedAtSecond < oldest.info.ModifiedAtSecond {
+ oldest = replica
+ allSame = false
+ }
+ }
+ if !allSame {
+ return oldest
+ }
+
+ // TODO what if all the replicas have the same timestamp?
+ return oldest
+}
diff --git a/weed/shell/command_volume_fix_replication_test.go b/weed/shell/command_volume_fix_replication_test.go
index 4cfbd96aa..bb61be1ef 100644
--- a/weed/shell/command_volume_fix_replication_test.go
+++ b/weed/shell/command_volume_fix_replication_test.go
@@ -8,11 +8,11 @@ import (
)
type testcase struct {
- name string
- replication string
- existingLocations []location
- possibleLocation location
- expected bool
+ name string
+ replication string
+ replicas []*VolumeReplica
+ possibleLocation location
+ expected bool
}
func TestSatisfyReplicaPlacementComplicated(t *testing.T) {
@@ -21,8 +21,10 @@ func TestSatisfyReplicaPlacementComplicated(t *testing.T) {
{
name: "test 100 negative",
replication: "100",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
},
possibleLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
expected: false,
@@ -30,8 +32,10 @@ func TestSatisfyReplicaPlacementComplicated(t *testing.T) {
{
name: "test 100 positive",
replication: "100",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
},
possibleLocation: location{"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
expected: true,
@@ -39,10 +43,16 @@ func TestSatisfyReplicaPlacementComplicated(t *testing.T) {
{
name: "test 022 positive",
replication: "022",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
- {"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
- {"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ {
+ location: &location{"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ },
},
possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn4"}},
expected: true,
@@ -50,10 +60,16 @@ func TestSatisfyReplicaPlacementComplicated(t *testing.T) {
{
name: "test 022 negative",
replication: "022",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
- {"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
- {"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ {
+ location: &location{"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ },
},
possibleLocation: location{"dc1", "r4", &master_pb.DataNodeInfo{Id: "dn4"}},
expected: false,
@@ -61,10 +77,16 @@ func TestSatisfyReplicaPlacementComplicated(t *testing.T) {
{
name: "test 210 moved from 200 positive",
replication: "210",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
- {"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
- {"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ {
+ location: &location{"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ },
},
possibleLocation: location{"dc1", "r4", &master_pb.DataNodeInfo{Id: "dn4"}},
expected: true,
@@ -72,10 +94,16 @@ func TestSatisfyReplicaPlacementComplicated(t *testing.T) {
{
name: "test 210 moved from 200 negative extra dc",
replication: "210",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
- {"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
- {"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ {
+ location: &location{"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ },
},
possibleLocation: location{"dc4", "r4", &master_pb.DataNodeInfo{Id: "dn4"}},
expected: false,
@@ -83,10 +111,16 @@ func TestSatisfyReplicaPlacementComplicated(t *testing.T) {
{
name: "test 210 moved from 200 negative extra data node",
replication: "210",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
- {"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
- {"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ {
+ location: &location{"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ },
},
possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn4"}},
expected: false,
@@ -103,9 +137,13 @@ func TestSatisfyReplicaPlacement01x(t *testing.T) {
{
name: "test 011 same existing rack",
replication: "011",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
},
possibleLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn3"}},
expected: true,
@@ -113,9 +151,13 @@ func TestSatisfyReplicaPlacement01x(t *testing.T) {
{
name: "test 011 negative",
replication: "011",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
},
possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}},
expected: false,
@@ -123,9 +165,13 @@ func TestSatisfyReplicaPlacement01x(t *testing.T) {
{
name: "test 011 different existing racks",
replication: "011",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
- {"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
},
possibleLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn3"}},
expected: true,
@@ -133,9 +179,13 @@ func TestSatisfyReplicaPlacement01x(t *testing.T) {
{
name: "test 011 different existing racks negative",
replication: "011",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
- {"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
},
possibleLocation: location{"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
expected: false,
@@ -152,8 +202,10 @@ func TestSatisfyReplicaPlacement00x(t *testing.T) {
{
name: "test 001",
replication: "001",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
},
possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
expected: true,
@@ -161,9 +213,13 @@ func TestSatisfyReplicaPlacement00x(t *testing.T) {
{
name: "test 002 positive",
replication: "002",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
},
possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}},
expected: true,
@@ -171,9 +227,13 @@ func TestSatisfyReplicaPlacement00x(t *testing.T) {
{
name: "test 002 negative, repeat the same node",
replication: "002",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
},
possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
expected: false,
@@ -181,10 +241,16 @@ func TestSatisfyReplicaPlacement00x(t *testing.T) {
{
name: "test 002 negative, enough node already",
replication: "002",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}},
+ },
},
possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn4"}},
expected: false,
@@ -199,9 +265,9 @@ func runTests(tests []testcase, t *testing.T) {
for _, tt := range tests {
replicaPlacement, _ := super_block.NewReplicaPlacementFromString(tt.replication)
println("replication:", tt.replication, "expected", tt.expected, "name:", tt.name)
- if satisfyReplicaPlacement(replicaPlacement, tt.existingLocations, tt.possibleLocation) != tt.expected {
+ if satisfyReplicaPlacement(replicaPlacement, tt.replicas, tt.possibleLocation) != tt.expected {
t.Errorf("%s: expect %v add %v to %s %+v",
- tt.name, tt.expected, tt.possibleLocation, tt.replication, tt.existingLocations)
+ tt.name, tt.expected, tt.possibleLocation, tt.replication, tt.replicas)
}
}
}
diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go
new file mode 100644
index 000000000..214783ee1
--- /dev/null
+++ b/weed/shell/command_volume_server_evacuate.go
@@ -0,0 +1,208 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+ "io"
+ "sort"
+)
+
+func init() {
+ Commands = append(Commands, &commandVolumeServerEvacuate{})
+}
+
+type commandVolumeServerEvacuate struct {
+}
+
+func (c *commandVolumeServerEvacuate) Name() string {
+ return "volumeServer.evacuate"
+}
+
+func (c *commandVolumeServerEvacuate) Help() string {
+ return `move out all data on a volume server
+
+ volumeServer.evacuate -node <host:port>
+
+ This command moves all data away from the volume server.
+ The volumes on the volume servers will be redistributed.
+
+ Usually this is used to prepare to shutdown or upgrade the volume server.
+
+ Sometimes a volume can not be moved because there are no
+ good destination to meet the replication requirement.
+ E.g. a volume replication 001 in a cluster with 2 volume servers can not be moved.
+ You can use "-skipNonMoveable" to move the rest volumes.
+
+`
+}
+
+func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
+ vsEvacuateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ volumeServer := vsEvacuateCommand.String("node", "", "<host>:<port> of the volume server")
+ skipNonMoveable := vsEvacuateCommand.Bool("skipNonMoveable", false, "skip volumes that can not be moved")
+ applyChange := vsEvacuateCommand.Bool("force", false, "actually apply the changes")
+ if err = vsEvacuateCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ if *volumeServer == "" {
+ return fmt.Errorf("need to specify volume server by -node=<host>:<port>")
+ }
+
+ return volumeServerEvacuate(commandEnv, *volumeServer, *skipNonMoveable, *applyChange, writer)
+
+}
+
+func volumeServerEvacuate(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) (err error) {
+ // 1. confirm the volume server is part of the cluster
+ // 2. collect all other volume servers, sort by empty slots
+ // 3. move to any other volume server as long as it satisfy the replication requirements
+
+ // list all the volumes
+ var resp *master_pb.VolumeListResponse
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
+ return err
+ })
+ if err != nil {
+ return err
+ }
+
+ if err := evacuateNormalVolumes(commandEnv, resp, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
+ return err
+ }
+
+ if err := evacuateEcVolumes(commandEnv, resp, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func evacuateNormalVolumes(commandEnv *CommandEnv, resp *master_pb.VolumeListResponse, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
+ // find this volume server
+ volumeServers := collectVolumeServersByDc(resp.TopologyInfo, "")
+ thisNode, otherNodes := nodesOtherThan(volumeServers, volumeServer)
+ if thisNode == nil {
+ return fmt.Errorf("%s is not found in this cluster", volumeServer)
+ }
+
+ // move away normal volumes
+ volumeReplicas, _ := collectVolumeReplicaLocations(resp)
+ for _, vol := range thisNode.info.VolumeInfos {
+ hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange)
+ if err != nil {
+ return fmt.Errorf("move away volume %d from %s: %v", vol.Id, volumeServer, err)
+ }
+ if !hasMoved {
+ if skipNonMoveable {
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vol.ReplicaPlacement))
+ fmt.Fprintf(writer, "skipping non moveable volume %d replication:%s\n", vol.Id, replicaPlacement.String())
+ } else {
+ return fmt.Errorf("failed to move volume %d from %s", vol.Id, volumeServer)
+ }
+ }
+ }
+ return nil
+}
+
+func evacuateEcVolumes(commandEnv *CommandEnv, resp *master_pb.VolumeListResponse, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
+ // find this ec volume server
+ ecNodes, _ := collectEcVolumeServersByDc(resp.TopologyInfo, "")
+ thisNode, otherNodes := ecNodesOtherThan(ecNodes, volumeServer)
+ if thisNode == nil {
+ return fmt.Errorf("%s is not found in this cluster\n", volumeServer)
+ }
+
+ // move away ec volumes
+ for _, ecShardInfo := range thisNode.info.EcShardInfos {
+ hasMoved, err := moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange)
+ if err != nil {
+ return fmt.Errorf("move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err)
+ }
+ if !hasMoved {
+ if skipNonMoveable {
+ fmt.Fprintf(writer, "failed to move away ec volume %d from %s\n", ecShardInfo.Id, volumeServer)
+ } else {
+ return fmt.Errorf("failed to move away ec volume %d from %s", ecShardInfo.Id, volumeServer)
+ }
+ }
+ }
+ return nil
+}
+
+func moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool) (hasMoved bool, err error) {
+
+ for _, shardId := range erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds() {
+
+ sort.Slice(otherNodes, func(i, j int) bool {
+ return otherNodes[i].localShardIdCount(ecShardInfo.Id) < otherNodes[j].localShardIdCount(ecShardInfo.Id)
+ })
+
+ for i := 0; i < len(otherNodes); i++ {
+ emptyNode := otherNodes[i]
+ err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, needle.VolumeId(ecShardInfo.Id), shardId, emptyNode, applyChange)
+ if err != nil {
+ return
+ } else {
+ hasMoved = true
+ break
+ }
+ }
+ if !hasMoved {
+ return
+ }
+ }
+
+ return
+}
+
+func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) {
+ sort.Slice(otherNodes, func(i, j int) bool {
+ return otherNodes[i].localVolumeRatio() < otherNodes[j].localVolumeRatio()
+ })
+
+ for i := 0; i < len(otherNodes); i++ {
+ emptyNode := otherNodes[i]
+ hasMoved, err = maybeMoveOneVolume(commandEnv, volumeReplicas, thisNode, vol, emptyNode, applyChange)
+ if err != nil {
+ return
+ }
+ if hasMoved {
+ break
+ }
+ }
+ return
+}
+
+func nodesOtherThan(volumeServers []*Node, thisServer string) (thisNode *Node, otherNodes []*Node) {
+ for _, node := range volumeServers {
+ if node.info.Id == thisServer {
+ thisNode = node
+ continue
+ }
+ otherNodes = append(otherNodes, node)
+ }
+ return
+}
+
+func ecNodesOtherThan(volumeServers []*EcNode, thisServer string) (thisNode *EcNode, otherNodes []*EcNode) {
+ for _, node := range volumeServers {
+ if node.info.Id == thisServer {
+ thisNode = node
+ continue
+ }
+ otherNodes = append(otherNodes, node)
+ }
+ return
+}
diff --git a/weed/shell/command_volume_server_leave.go b/weed/shell/command_volume_server_leave.go
new file mode 100644
index 000000000..2a2e56e86
--- /dev/null
+++ b/weed/shell/command_volume_server_leave.go
@@ -0,0 +1,67 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "google.golang.org/grpc"
+ "io"
+)
+
+func init() {
+ Commands = append(Commands, &commandVolumeServerLeave{})
+}
+
+type commandVolumeServerLeave struct {
+}
+
+func (c *commandVolumeServerLeave) Name() string {
+ return "volumeServer.leave"
+}
+
+func (c *commandVolumeServerLeave) Help() string {
+ return `stop a volume server from sending heartbeats to the master
+
+ volume.unmount -node <volume server host:port> -force
+
+ This command enables gracefully shutting down the volume server.
+ The volume server will stop sending heartbeats to the master.
+ After draining the traffic for a few seconds, you can safely shut down the volume server.
+
+ This operation is not revocable unless the volume server is restarted.
+`
+}
+
+func (c *commandVolumeServerLeave) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
+ vsLeaveCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ volumeServer := vsLeaveCommand.String("node", "", "<host>:<port> of the volume server")
+ if err = vsLeaveCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ if *volumeServer == "" {
+ return fmt.Errorf("need to specify volume server by -node=<host>:<port>")
+ }
+
+ return volumeServerLeave(commandEnv.option.GrpcDialOption, *volumeServer, writer)
+
+}
+
+func volumeServerLeave(grpcDialOption grpc.DialOption, volumeServer string, writer io.Writer) (err error) {
+ return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, leaveErr := volumeServerClient.VolumeServerLeave(context.Background(), &volume_server_pb.VolumeServerLeaveRequest{})
+ if leaveErr != nil {
+ fmt.Fprintf(writer, "ask volume server %s to leave: %v\n", volumeServer, leaveErr)
+ } else {
+ fmt.Fprintf(writer, "stopped heartbeat in volume server %s. After a few seconds to drain traffic, it will be safe to stop the volume server.\n", volumeServer)
+ }
+ return leaveErr
+ })
+}
diff --git a/weed/shell/shell_liner.go b/weed/shell/shell_liner.go
index 4632a1fb0..2d5166acf 100644
--- a/weed/shell/shell_liner.go
+++ b/weed/shell/shell_liner.go
@@ -66,7 +66,7 @@ func processEachCmd(reg *regexp.Regexp, cmd string, commandEnv *CommandEnv) bool
args[i] = strings.Trim(string(cmds[1+i]), "\"'")
}
- cmd := strings.ToLower(cmds[0])
+ cmd := cmds[0]
if cmd == "help" || cmd == "?" {
printHelp(cmds)
} else if cmd == "exit" || cmd == "quit" {