aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/shell/command_ec_common.go70
-rw-r--r--weed/shell/command_ec_common_test.go87
-rw-r--r--weed/shell/command_ec_encode_test.go31
-rw-r--r--weed/shell/command_ec_test.go2
-rw-r--r--weed/storage/super_block/replica_placement.go9
5 files changed, 141 insertions, 58 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index fd7a1acdc..b98921fd7 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -3,7 +3,6 @@ package shell
import (
"context"
"fmt"
- "math"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
@@ -12,11 +11,32 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
+ "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
)
+type DataCenterId string
+type EcNodeId string
+type RackId string
+
+type EcNode struct {
+ info *master_pb.DataNodeInfo
+ dc DataCenterId
+ rack RackId
+ freeEcSlot int
+}
+type CandidateEcNode struct {
+ ecNode *EcNode
+ shardCount int
+}
+
+type EcRack struct {
+ ecNodes map[EcNodeId]*EcNode
+ freeEcSlot int
+}
+
func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
if !commandEnv.isLocked() {
@@ -68,7 +88,6 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
err = operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
if targetAddress != existingLocation {
-
fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
_, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
VolumeId: uint32(volumeId),
@@ -109,6 +128,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
return
}
+// TODO: Make dc a DataCenterId instead of string.
func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc string, rack RackId, dn *master_pb.DataNodeInfo)) {
for _, dc := range topo.DataCenterInfos {
for _, rack := range dc.RackInfos {
@@ -131,11 +151,6 @@ func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) {
})
}
-type CandidateEcNode struct {
- ecNode *EcNode
- shardCount int
-}
-
// if the index node changed the freeEcSlot, need to keep every EcNode still sorted
func ensureSortedEcNodes(data []*CandidateEcNode, index int, lessThan func(i, j int) bool) {
for i := index - 1; i >= 0; i-- {
@@ -179,16 +194,6 @@ func countFreeShardSlots(dn *master_pb.DataNodeInfo, diskType types.DiskType) (c
return int(diskInfo.MaxVolumeCount-diskInfo.VolumeCount)*erasure_coding.DataShardsCount - countShards(diskInfo.EcShardInfos)
}
-type RackId string
-type EcNodeId string
-
-type EcNode struct {
- info *master_pb.DataNodeInfo
- dc string
- rack RackId
- freeEcSlot int
-}
-
func (ecNode *EcNode) localShardIdCount(vid uint32) int {
for _, diskInfo := range ecNode.info.DiskInfos {
for _, ecShardInfo := range diskInfo.EcShardInfos {
@@ -201,13 +206,7 @@ func (ecNode *EcNode) localShardIdCount(vid uint32) int {
return 0
}
-type EcRack struct {
- ecNodes map[EcNodeId]*EcNode
- freeEcSlot int
-}
-
func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
-
// list all possible locations
// collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
@@ -232,7 +231,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
ecNodes = append(ecNodes, &EcNode{
info: dn,
- dc: dc,
+ dc: DataCenterId(dc),
rack: rack,
freeEcSlot: int(freeEcSlots),
})
@@ -283,8 +282,12 @@ func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId n
})
}
-func ceilDivide(total, n int) int {
- return int(math.Ceil(float64(total) / float64(n)))
+func ceilDivide(a, b int) int {
+ var r int
+ if (a % b) != 0 {
+ r = 1
+ }
+ return (a / b) + r
}
func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
@@ -772,6 +775,21 @@ func collectVolumeIdToEcNodes(allEcNodes []*EcNode, collection string) map[needl
return vidLocations
}
+func volumeIdToReplicaPlacement(vid needle.VolumeId, nodes []*EcNode) (*super_block.ReplicaPlacement, error) {
+ for _, ecNode := range nodes {
+ for _, diskInfo := range ecNode.info.DiskInfos {
+ for _, volumeInfo := range diskInfo.VolumeInfos {
+ if needle.VolumeId(volumeInfo.Id) != vid {
+ continue
+ }
+ return super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
+ }
+ }
+ }
+
+ return nil, fmt.Errorf("failed to resolve replica placement for volume ID %d", vid)
+}
+
func EcBalance(commandEnv *CommandEnv, collections []string, dc string, applyBalancing bool) (err error) {
if len(collections) == 0 {
return fmt.Errorf("no collections to balance")
diff --git a/weed/shell/command_ec_common_test.go b/weed/shell/command_ec_common_test.go
new file mode 100644
index 000000000..412599115
--- /dev/null
+++ b/weed/shell/command_ec_common_test.go
@@ -0,0 +1,87 @@
+package shell
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+ "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
+ "github.com/seaweedfs/seaweedfs/weed/storage/needle"
+ "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
+)
+
+func TestEcDistribution(t *testing.T) {
+
+ topologyInfo := parseOutput(topoData)
+
+ // find out all volume servers with one slot left.
+ ecNodes, totalFreeEcSlots := collectEcVolumeServersByDc(topologyInfo, "")
+
+ sortEcNodesByFreeslotsDescending(ecNodes)
+
+ if totalFreeEcSlots < erasure_coding.TotalShardsCount {
+ t.Errorf("not enough free ec shard slots: %d", totalFreeEcSlots)
+ }
+ allocatedDataNodes := ecNodes
+ if len(allocatedDataNodes) > erasure_coding.TotalShardsCount {
+ allocatedDataNodes = allocatedDataNodes[:erasure_coding.TotalShardsCount]
+ }
+
+ for _, dn := range allocatedDataNodes {
+ // fmt.Printf("info %+v %+v\n", dn.info, dn)
+ fmt.Printf("=> %+v %+v\n", dn.info.Id, dn.freeEcSlot)
+ }
+}
+
+func TestVolumeIdToReplicaPlacement(t *testing.T) {
+ topo1 := parseOutput(topoData)
+ topo2 := parseOutput(topoData2)
+
+ testCases := []struct {
+ topology *master_pb.TopologyInfo
+ vid string
+ want string
+ wantErr string
+ }{
+ {topo1, "", "", "failed to resolve replica placement for volume ID 0"},
+ {topo1, "0", "", "failed to resolve replica placement for volume ID 0"},
+ {topo1, "1", "100", ""},
+ {topo1, "296", "100", ""},
+ {topo2, "", "", "failed to resolve replica placement for volume ID 0"},
+ {topo2, "19012", "", "failed to resolve replica placement for volume ID 19012"},
+ {topo2, "6271", "002", ""},
+ {topo2, "17932", "002", ""},
+ }
+
+ for _, tc := range testCases {
+ vid, _ := needle.NewVolumeId(tc.vid)
+ ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "")
+ got, gotErr := volumeIdToReplicaPlacement(vid, ecNodes)
+
+ if tc.wantErr == "" && gotErr != nil {
+ t.Errorf("expected no error for volume '%s', got '%s'", tc.vid, gotErr.Error())
+ continue
+ }
+ if tc.wantErr != "" {
+ if gotErr == nil {
+ t.Errorf("got no error for volume '%s', expected '%s'", tc.vid, tc.wantErr)
+ continue
+ }
+ if gotErr.Error() != tc.wantErr {
+ t.Errorf("expected error '%s' for volume '%s', got '%s'", tc.wantErr, tc.vid, gotErr.Error())
+ continue
+ }
+ }
+
+ if got == nil {
+ if tc.want != "" {
+ t.Errorf("expected replica placement '%s' for volume '%s', got nil", tc.want, tc.vid)
+ }
+ continue
+ }
+ want, _ := super_block.NewReplicaPlacementFromString(tc.want)
+ if !got.Equals(want) {
+ t.Errorf("got replica placement '%s' for volune '%s', want '%s'", got.String(), tc.vid, want.String())
+ }
+ }
+}
diff --git a/weed/shell/command_ec_encode_test.go b/weed/shell/command_ec_encode_test.go
deleted file mode 100644
index 346e2af14..000000000
--- a/weed/shell/command_ec_encode_test.go
+++ /dev/null
@@ -1,31 +0,0 @@
-package shell
-
-import (
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
- "testing"
-)
-
-func TestEcDistribution(t *testing.T) {
-
- topologyInfo := parseOutput(topoData)
-
- // find out all volume servers with one slot left.
- ecNodes, totalFreeEcSlots := collectEcVolumeServersByDc(topologyInfo, "")
-
- sortEcNodesByFreeslotsDescending(ecNodes)
-
- if totalFreeEcSlots < erasure_coding.TotalShardsCount {
- println("not enough free ec shard slots", totalFreeEcSlots)
- }
- allocatedDataNodes := ecNodes
- if len(allocatedDataNodes) > erasure_coding.TotalShardsCount {
- allocatedDataNodes = allocatedDataNodes[:erasure_coding.TotalShardsCount]
- }
-
- for _, dn := range allocatedDataNodes {
- // fmt.Printf("info %+v %+v\n", dn.info, dn)
- fmt.Printf("=> %+v %+v\n", dn.info.Id, dn.freeEcSlot)
- }
-
-}
diff --git a/weed/shell/command_ec_test.go b/weed/shell/command_ec_test.go
index 25380ddca..ef9461ef0 100644
--- a/weed/shell/command_ec_test.go
+++ b/weed/shell/command_ec_test.go
@@ -129,7 +129,7 @@ func newEcNode(dc string, rack string, dataNodeId string, freeEcSlot int) *EcNod
Id: dataNodeId,
DiskInfos: make(map[string]*master_pb.DiskInfo),
},
- dc: dc,
+ dc: DataCenterId(dc),
rack: RackId(rack),
freeEcSlot: freeEcSlot,
}
diff --git a/weed/storage/super_block/replica_placement.go b/weed/storage/super_block/replica_placement.go
index b2bf21fcb..f6d14e25b 100644
--- a/weed/storage/super_block/replica_placement.go
+++ b/weed/storage/super_block/replica_placement.go
@@ -45,6 +45,15 @@ func NewReplicaPlacementFromByte(b byte) (*ReplicaPlacement, error) {
return NewReplicaPlacementFromString(fmt.Sprintf("%03d", b))
}
+func (a *ReplicaPlacement) Equals(b *ReplicaPlacement) bool {
+ if a == nil || b == nil {
+ return false
+ }
+ return (a.SameRackCount == b.SameRackCount &&
+ a.DiffRackCount == b.DiffRackCount &&
+ a.DiffDataCenterCount == b.DiffDataCenterCount)
+}
+
func (rp *ReplicaPlacement) Byte() byte {
if rp == nil {
return 0