aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-10-20 16:11:29 -0700
committerGitHub <noreply@github.com>2025-10-20 16:11:29 -0700
commitffd43218f62b024580bb2a70dc86c36ef5501e0f (patch)
tree4f8f419e62c5e3dd610a96c04b2bc1818dc05379
parent3e8cc5a90668ad934a6f86a4e93783f2b7bb1364 (diff)
downloadseaweedfs-ffd43218f62b024580bb2a70dc86c36ef5501e0f.tar.xz
seaweedfs-ffd43218f62b024580bb2a70dc86c36ef5501e0f.zip
create new volumes on less occupied disk locations (#7349)
* create new volumes on less occupied disk locations * add unit tests * address comments * fixes
-rw-r--r--weed/storage/disk_location.go13
-rw-r--r--weed/storage/store.go12
-rw-r--r--weed/storage/store_load_balancing_simple_test.go51
-rw-r--r--weed/storage/store_load_balancing_test.go256
4 files changed, 328 insertions, 4 deletions
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index 02f5f5923..aac824318 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -386,6 +386,19 @@ func (l *DiskLocation) VolumesLen() int {
return len(l.volumes)
}
+func (l *DiskLocation) LocalVolumesLen() int {
+ l.volumesLock.RLock()
+ defer l.volumesLock.RUnlock()
+
+ count := 0
+ for _, v := range l.volumes {
+ if !v.HasRemoteFile() {
+ count++
+ }
+ }
+ return count
+}
+
func (l *DiskLocation) SetStopping() {
l.volumesLock.Lock()
for _, v := range l.volumes {
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 1d625dd69..7c41f1c35 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -165,14 +165,18 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind
return fmt.Errorf("Volume Id %d already exists!", vid)
}
- // Find location and its index
+ // Find location with lowest local volume count (load balancing)
var location *DiskLocation
var diskId uint32
+ var minVolCount int
for i, loc := range s.Locations {
if loc.DiskType == diskType && s.hasFreeDiskLocation(loc) {
- location = loc
- diskId = uint32(i)
- break
+ volCount := loc.LocalVolumesLen()
+ if location == nil || volCount < minVolCount {
+ location = loc
+ diskId = uint32(i)
+ minVolCount = volCount
+ }
}
}
diff --git a/weed/storage/store_load_balancing_simple_test.go b/weed/storage/store_load_balancing_simple_test.go
new file mode 100644
index 000000000..87e4636db
--- /dev/null
+++ b/weed/storage/store_load_balancing_simple_test.go
@@ -0,0 +1,51 @@
+package storage
+
+import (
+ "testing"
+
+ "github.com/seaweedfs/seaweedfs/weed/storage/needle"
+ "github.com/seaweedfs/seaweedfs/weed/storage/types"
+)
+
+// TestLoadBalancingDistribution tests that volumes are evenly distributed
+func TestLoadBalancingDistribution(t *testing.T) {
+ // Create test store with 3 directories
+ store := newTestStore(t, 3)
+
+ // Create 9 volumes and verify they're evenly distributed
+ volumesToCreate := 9
+ for i := 1; i <= volumesToCreate; i++ {
+ volumeId := needle.VolumeId(i)
+
+ err := store.AddVolume(volumeId, "", NeedleMapInMemory, "000", "",
+ 0, needle.GetCurrentVersion(), 0, types.HardDriveType, 3)
+
+ if err != nil {
+ t.Fatalf("Failed to add volume %d: %v", volumeId, err)
+ }
+ }
+
+ // Check distribution - should be 3 volumes per location
+ for i, location := range store.Locations {
+ localCount := location.LocalVolumesLen()
+ if localCount != 3 {
+ t.Errorf("Location %d: expected 3 local volumes, got %d", i, localCount)
+ }
+ }
+
+ // Verify specific distribution pattern
+ expected := map[int][]needle.VolumeId{
+ 0: {1, 4, 7},
+ 1: {2, 5, 8},
+ 2: {3, 6, 9},
+ }
+
+ for locIdx, expectedVols := range expected {
+ location := store.Locations[locIdx]
+ for _, vid := range expectedVols {
+ if _, found := location.FindVolume(vid); !found {
+ t.Errorf("Location %d: expected to find volume %d, but it's not there", locIdx, vid)
+ }
+ }
+ }
+}
diff --git a/weed/storage/store_load_balancing_test.go b/weed/storage/store_load_balancing_test.go
new file mode 100644
index 000000000..15e709d53
--- /dev/null
+++ b/weed/storage/store_load_balancing_test.go
@@ -0,0 +1,256 @@
+package storage
+
+import (
+ "os"
+ "path/filepath"
+ "strconv"
+ "testing"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/seaweedfs/seaweedfs/weed/storage/needle"
+ "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
+ "github.com/seaweedfs/seaweedfs/weed/storage/types"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+)
+
+// newTestStore creates a test store with the specified number of directories
+func newTestStore(t *testing.T, numDirs int) *Store {
+ tempDir := t.TempDir()
+
+ var dirs []string
+ var maxCounts []int32
+ var minFreeSpaces []util.MinFreeSpace
+ var diskTypes []types.DiskType
+
+ for i := 0; i < numDirs; i++ {
+ dir := filepath.Join(tempDir, "dir"+strconv.Itoa(i))
+ os.MkdirAll(dir, 0755)
+ dirs = append(dirs, dir)
+ maxCounts = append(maxCounts, 100) // high limit
+ minFreeSpaces = append(minFreeSpaces, util.MinFreeSpace{})
+ diskTypes = append(diskTypes, types.HardDriveType)
+ }
+
+ store := NewStore(nil, "localhost", 8080, 18080, "http://localhost:8080",
+ dirs, maxCounts, minFreeSpaces, "", NeedleMapInMemory, diskTypes, 3)
+
+ // Consume channel messages to prevent blocking
+ done := make(chan bool)
+ go func() {
+ for {
+ select {
+ case <-store.NewVolumesChan:
+ case <-done:
+ return
+ }
+ }
+ }()
+ t.Cleanup(func() { close(done) })
+
+ return store
+}
+
+func TestLocalVolumesLen(t *testing.T) {
+ testCases := []struct {
+ name string
+ totalVolumes int
+ remoteVolumes int
+ expectedLocalCount int
+ }{
+ {
+ name: "all local volumes",
+ totalVolumes: 5,
+ remoteVolumes: 0,
+ expectedLocalCount: 5,
+ },
+ {
+ name: "all remote volumes",
+ totalVolumes: 5,
+ remoteVolumes: 5,
+ expectedLocalCount: 0,
+ },
+ {
+ name: "mixed local and remote",
+ totalVolumes: 10,
+ remoteVolumes: 3,
+ expectedLocalCount: 7,
+ },
+ {
+ name: "no volumes",
+ totalVolumes: 0,
+ remoteVolumes: 0,
+ expectedLocalCount: 0,
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ diskLocation := &DiskLocation{
+ volumes: make(map[needle.VolumeId]*Volume),
+ }
+
+ // Add volumes
+ for i := 0; i < tc.totalVolumes; i++ {
+ vol := &Volume{
+ Id: needle.VolumeId(i + 1),
+ volumeInfo: &volume_server_pb.VolumeInfo{},
+ }
+
+ // Mark some as remote
+ if i < tc.remoteVolumes {
+ vol.hasRemoteFile = true
+ vol.volumeInfo.Files = []*volume_server_pb.RemoteFile{
+ {BackendType: "s3", BackendId: "test", Key: "test-key"},
+ }
+ }
+
+ diskLocation.volumes[vol.Id] = vol
+ }
+
+ result := diskLocation.LocalVolumesLen()
+
+ if result != tc.expectedLocalCount {
+ t.Errorf("Expected LocalVolumesLen() = %d; got %d (total: %d, remote: %d)",
+ tc.expectedLocalCount, result, tc.totalVolumes, tc.remoteVolumes)
+ }
+ })
+ }
+}
+
+func TestVolumeLoadBalancing(t *testing.T) {
+ testCases := []struct {
+ name string
+ locations []locationSetup
+ expectedLocations []int // which location index should get each volume
+ }{
+ {
+ name: "even distribution across empty locations",
+ locations: []locationSetup{
+ {localVolumes: 0, remoteVolumes: 0},
+ {localVolumes: 0, remoteVolumes: 0},
+ {localVolumes: 0, remoteVolumes: 0},
+ },
+ expectedLocations: []int{0, 1, 2, 0, 1, 2}, // round-robin
+ },
+ {
+ name: "prefers location with fewer local volumes",
+ locations: []locationSetup{
+ {localVolumes: 5, remoteVolumes: 0},
+ {localVolumes: 2, remoteVolumes: 0},
+ {localVolumes: 8, remoteVolumes: 0},
+ },
+ expectedLocations: []int{1, 1, 1}, // all go to location 1 (has fewest)
+ },
+ {
+ name: "ignores remote volumes in count",
+ locations: []locationSetup{
+ {localVolumes: 2, remoteVolumes: 10}, // 2 local, 10 remote
+ {localVolumes: 5, remoteVolumes: 0}, // 5 local
+ {localVolumes: 3, remoteVolumes: 0}, // 3 local
+ },
+ // expectedLocations: []int{0, 0, 2}
+ // Explanation:
+ // 1. Initial local counts: [2, 5, 3]. First volume goes to location 0 (2 local, ignoring 10 remote).
+ // 2. New local counts: [3, 5, 3]. Second volume goes to location 0 (first with min count 3).
+ // 3. New local counts: [4, 5, 3]. Third volume goes to location 2 (3 local < 4 local).
+ expectedLocations: []int{0, 0, 2},
+ },
+ {
+ name: "balances when some locations have remote volumes",
+ locations: []locationSetup{
+ {localVolumes: 1, remoteVolumes: 5},
+ {localVolumes: 1, remoteVolumes: 0},
+ {localVolumes: 0, remoteVolumes: 3},
+ },
+ // expectedLocations: []int{2, 0, 1}
+ // Explanation:
+ // 1. Initial local counts: [1, 1, 0]. First volume goes to location 2 (0 local).
+ // 2. New local counts: [1, 1, 1]. Second volume goes to location 0 (first with min count 1).
+ // 3. New local counts: [2, 1, 1]. Third volume goes to location 1 (next with min count 1).
+ expectedLocations: []int{2, 0, 1},
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ // Create test store with multiple directories
+ store := newTestStore(t, len(tc.locations))
+
+ // Pre-populate locations with volumes
+ for locIdx, setup := range tc.locations {
+ location := store.Locations[locIdx]
+ vidCounter := 1000 + locIdx*100 // unique volume IDs per location
+
+ // Add local volumes
+ for i := 0; i < setup.localVolumes; i++ {
+ vol := createTestVolume(needle.VolumeId(vidCounter), false)
+ location.SetVolume(vol.Id, vol)
+ vidCounter++
+ }
+
+ // Add remote volumes
+ for i := 0; i < setup.remoteVolumes; i++ {
+ vol := createTestVolume(needle.VolumeId(vidCounter), true)
+ location.SetVolume(vol.Id, vol)
+ vidCounter++
+ }
+ }
+
+ // Create volumes and verify they go to expected locations
+ for i, expectedLoc := range tc.expectedLocations {
+ volumeId := needle.VolumeId(i + 1)
+
+ err := store.AddVolume(volumeId, "", NeedleMapInMemory, "000", "",
+ 0, needle.GetCurrentVersion(), 0, types.HardDriveType, 3)
+
+ if err != nil {
+ t.Fatalf("Failed to add volume %d: %v", volumeId, err)
+ }
+
+ // Find which location got the volume
+ actualLoc := -1
+ for locIdx, location := range store.Locations {
+ if _, found := location.FindVolume(volumeId); found {
+ actualLoc = locIdx
+ break
+ }
+ }
+
+ if actualLoc != expectedLoc {
+ t.Errorf("Volume %d: expected location %d, got location %d",
+ volumeId, expectedLoc, actualLoc)
+
+ // Debug info
+ for locIdx, loc := range store.Locations {
+ localCount := loc.LocalVolumesLen()
+ totalCount := loc.VolumesLen()
+ t.Logf(" Location %d: %d local, %d total", locIdx, localCount, totalCount)
+ }
+ }
+ }
+ })
+ }
+}
+
+// Helper types and functions
+type locationSetup struct {
+ localVolumes int
+ remoteVolumes int
+}
+
+func createTestVolume(vid needle.VolumeId, isRemote bool) *Volume {
+ vol := &Volume{
+ Id: vid,
+ SuperBlock: super_block.SuperBlock{},
+ volumeInfo: &volume_server_pb.VolumeInfo{},
+ }
+
+ if isRemote {
+ vol.hasRemoteFile = true
+ vol.volumeInfo.Files = []*volume_server_pb.RemoteFile{
+ {BackendType: "s3", BackendId: "test", Key: "remote-key-" + strconv.Itoa(int(vid))},
+ }
+ }
+
+ return vol
+}