1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
|
package shell
import (
"bytes"
"strings"
"testing"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
)
// TestEcShardMapRegister tests that EC shards are properly registered
func TestEcShardMapRegister(t *testing.T) {
ecShardMap := make(EcShardMap)
// Create test nodes with EC shards
node1 := newEcNode("dc1", "rack1", "node1", 100).
addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6})
node2 := newEcNode("dc1", "rack1", "node2", 100).
addEcVolumeAndShardsForTest(1, "c1", []uint32{7, 8, 9, 10, 11, 12, 13})
ecShardMap.registerEcNode(node1, "c1")
ecShardMap.registerEcNode(node2, "c1")
// Verify volume 1 is registered
locations, found := ecShardMap[needle.VolumeId(1)]
if !found {
t.Fatal("Expected volume 1 to be registered")
}
// Check shard count
count := locations.shardCount()
if count != erasure_coding.TotalShardsCount {
t.Errorf("Expected %d shards, got %d", erasure_coding.TotalShardsCount, count)
}
// Verify shard distribution
for i := 0; i < 7; i++ {
if len(locations[i]) != 1 || locations[i][0].info.Id != "node1" {
t.Errorf("Shard %d should be on node1", i)
}
}
for i := 7; i < erasure_coding.TotalShardsCount; i++ {
if len(locations[i]) != 1 || locations[i][0].info.Id != "node2" {
t.Errorf("Shard %d should be on node2", i)
}
}
}
// TestEcShardMapShardCount tests shard counting
func TestEcShardMapShardCount(t *testing.T) {
testCases := []struct {
name string
shardIds []uint32
expectedCount int
}{
{"all shards", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}, 14},
{"data shards only", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, 10},
{"parity shards only", []uint32{10, 11, 12, 13}, 4},
{"missing some shards", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8}, 9},
{"single shard", []uint32{0}, 1},
{"no shards", []uint32{}, 0},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
locations := make(EcShardLocations, erasure_coding.MaxShardCount)
for _, shardId := range tc.shardIds {
locations[shardId] = []*EcNode{
newEcNode("dc1", "rack1", "node1", 100),
}
}
count := locations.shardCount()
if count != tc.expectedCount {
t.Errorf("Expected %d shards, got %d", tc.expectedCount, count)
}
})
}
}
// TestRebuildEcVolumesInsufficientShards tests error handling for unrepairable volumes
func TestRebuildEcVolumesInsufficientShards(t *testing.T) {
var logBuffer bytes.Buffer
// Create a volume with insufficient shards (less than DataShardsCount)
node1 := newEcNode("dc1", "rack1", "node1", 100).
addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4}) // Only 5 shards
erb := &ecRebuilder{
commandEnv: &CommandEnv{
env: make(map[string]string),
noLock: true, // Bypass lock check for unit test
},
ewg: NewErrorWaitGroup(DefaultMaxParallelization),
ecNodes: []*EcNode{node1},
writer: &logBuffer,
}
erb.rebuildEcVolumes("c1")
err := erb.ewg.Wait()
if err == nil {
t.Fatal("Expected error for insufficient shards, got nil")
}
if !strings.Contains(err.Error(), "unrepairable") {
t.Errorf("Expected 'unrepairable' in error message, got: %s", err.Error())
}
}
// TestRebuildEcVolumesCompleteVolume tests that complete volumes are skipped
func TestRebuildEcVolumesCompleteVolume(t *testing.T) {
var logBuffer bytes.Buffer
// Create a volume with all shards
node1 := newEcNode("dc1", "rack1", "node1", 100).
addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13})
erb := &ecRebuilder{
commandEnv: &CommandEnv{
env: make(map[string]string),
noLock: true, // Bypass lock check for unit test
},
ewg: NewErrorWaitGroup(DefaultMaxParallelization),
ecNodes: []*EcNode{node1},
writer: &logBuffer,
applyChanges: false,
}
erb.rebuildEcVolumes("c1")
err := erb.ewg.Wait()
if err != nil {
t.Fatalf("Expected no error for complete volume, got: %v", err)
}
// The function should return quickly without attempting rebuild
// since the volume is already complete
}
// TestRebuildEcVolumesInsufficientSpace tests error handling for insufficient disk space
func TestRebuildEcVolumesInsufficientSpace(t *testing.T) {
var logBuffer bytes.Buffer
// Create a volume with missing shards but insufficient free slots
// Node has 10 local shards, missing 4 shards (10,11,12,13), so needs 4 free slots
// Set free slots to 3 (insufficient)
node1 := newEcNode("dc1", "rack1", "node1", 3). // Only 3 free slots, need 4
addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
erb := &ecRebuilder{
commandEnv: &CommandEnv{
env: make(map[string]string),
noLock: true, // Bypass lock check for unit test
},
ewg: NewErrorWaitGroup(DefaultMaxParallelization),
ecNodes: []*EcNode{node1},
writer: &logBuffer,
applyChanges: false,
}
erb.rebuildEcVolumes("c1")
err := erb.ewg.Wait()
if err == nil {
t.Fatal("Expected error for insufficient disk space, got nil")
}
if !strings.Contains(err.Error(), "no node has sufficient free slots") {
t.Errorf("Expected 'no node has sufficient free slots' in error message, got: %s", err.Error())
}
// Verify the enhanced error message includes diagnostic information
if !strings.Contains(err.Error(), "need") || !strings.Contains(err.Error(), "max available") {
t.Errorf("Expected diagnostic information in error message, got: %s", err.Error())
}
}
// TestMultipleNodesWithShards tests rebuild with shards distributed across multiple nodes
func TestMultipleNodesWithShards(t *testing.T) {
ecShardMap := make(EcShardMap)
// Create 3 nodes with different shards
node1 := newEcNode("dc1", "rack1", "node1", 100).
addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3})
node2 := newEcNode("dc1", "rack1", "node2", 100).
addEcVolumeAndShardsForTest(1, "c1", []uint32{4, 5, 6, 7})
node3 := newEcNode("dc1", "rack1", "node3", 100).
addEcVolumeAndShardsForTest(1, "c1", []uint32{8, 9})
ecShardMap.registerEcNode(node1, "c1")
ecShardMap.registerEcNode(node2, "c1")
ecShardMap.registerEcNode(node3, "c1")
locations := ecShardMap[needle.VolumeId(1)]
count := locations.shardCount()
// We have 10 shards total, which is enough for data shards
if count != 10 {
t.Errorf("Expected 10 shards, got %d", count)
}
// Verify each shard is on the correct node
for i := 0; i < 4; i++ {
if len(locations[i]) != 1 || locations[i][0].info.Id != "node1" {
t.Errorf("Shard %d should be on node1", i)
}
}
for i := 4; i < 8; i++ {
if len(locations[i]) != 1 || locations[i][0].info.Id != "node2" {
t.Errorf("Shard %d should be on node2", i)
}
}
for i := 8; i < 10; i++ {
if len(locations[i]) != 1 || locations[i][0].info.Id != "node3" {
t.Errorf("Shard %d should be on node3", i)
}
}
}
// TestDuplicateShards tests handling of duplicate shards on multiple nodes
func TestDuplicateShards(t *testing.T) {
ecShardMap := make(EcShardMap)
// Create 2 nodes with overlapping shards (both have shard 0)
node1 := newEcNode("dc1", "rack1", "node1", 100).
addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3})
node2 := newEcNode("dc1", "rack1", "node2", 100).
addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 4, 5, 6}) // Duplicate shard 0
ecShardMap.registerEcNode(node1, "c1")
ecShardMap.registerEcNode(node2, "c1")
locations := ecShardMap[needle.VolumeId(1)]
// Shard 0 should be on both nodes
if len(locations[0]) != 2 {
t.Errorf("Expected shard 0 on 2 nodes, got %d", len(locations[0]))
}
// Verify both nodes are registered for shard 0
foundNode1 := false
foundNode2 := false
for _, node := range locations[0] {
if node.info.Id == "node1" {
foundNode1 = true
}
if node.info.Id == "node2" {
foundNode2 = true
}
}
if !foundNode1 || !foundNode2 {
t.Error("Both nodes should have shard 0")
}
// Shard count should be 7 (unique shards: 0, 1, 2, 3, 4, 5, 6)
count := locations.shardCount()
if count != 7 {
t.Errorf("Expected 7 unique shards, got %d", count)
}
}
|