aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-11-17 21:19:55 -0800
committerGitHub <noreply@github.com>2025-11-17 21:19:55 -0800
commit65f8986fe22a24d595159b8b981648d9788d8eb7 (patch)
treeab18c0cb7c8f1467b21900577832f5b283a39ae0
parentfa8df6e42b991f3bd6e202ed852d33d290e2dd24 (diff)
downloadseaweedfs-65f8986fe22a24d595159b8b981648d9788d8eb7.tar.xz
seaweedfs-65f8986fe22a24d595159b8b981648d9788d8eb7.zip
Volume Server: avoid aggressive volume assignment (#7501)
* avoid aggressive volume assignment * also test ec shards * separate DiskLocation instances for each subtest * edge cases * No volumes plus low disk space * Multiple EC volumes * simplify
-rw-r--r--weed/storage/store.go12
-rw-r--r--weed/storage/store_disk_space_test.go100
2 files changed, 111 insertions, 1 deletions
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 7c41f1c35..cc07f8702 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -292,7 +292,17 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
collectionVolumeReadOnlyCount := make(map[string]map[string]uint8)
for _, location := range s.Locations {
var deleteVids []needle.VolumeId
- maxVolumeCounts[string(location.DiskType)] += uint32(location.MaxVolumeCount)
+ effectiveMaxCount := location.MaxVolumeCount
+ if location.isDiskSpaceLow {
+ usedSlots := int32(location.LocalVolumesLen())
+ ecShardCount := location.EcShardCount()
+ usedSlots += int32((ecShardCount + erasure_coding.DataShardsCount - 1) / erasure_coding.DataShardsCount)
+ effectiveMaxCount = usedSlots
+ }
+ if effectiveMaxCount < 0 {
+ effectiveMaxCount = 0
+ }
+ maxVolumeCounts[string(location.DiskType)] += uint32(effectiveMaxCount)
location.volumesLock.RLock()
for _, v := range location.volumes {
curMaxFileKey, volumeMessage := v.ToVolumeInformationMessage()
diff --git a/weed/storage/store_disk_space_test.go b/weed/storage/store_disk_space_test.go
index 284657e3c..884b8dda1 100644
--- a/weed/storage/store_disk_space_test.go
+++ b/weed/storage/store_disk_space_test.go
@@ -3,7 +3,9 @@ package storage
import (
"testing"
+ "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
+ "github.com/seaweedfs/seaweedfs/weed/storage/types"
)
func TestHasFreeDiskLocation(t *testing.T) {
@@ -92,3 +94,101 @@ func TestHasFreeDiskLocation(t *testing.T) {
})
}
}
+
+func newTestLocation(maxCount int32, isDiskLow bool, volCount int) *DiskLocation {
+ location := &DiskLocation{
+ volumes: make(map[needle.VolumeId]*Volume),
+ ecVolumes: make(map[needle.VolumeId]*erasure_coding.EcVolume),
+ MaxVolumeCount: maxCount,
+ DiskType: types.ToDiskType("hdd"),
+ isDiskSpaceLow: isDiskLow,
+ }
+ for i := 1; i <= volCount; i++ {
+ location.volumes[needle.VolumeId(i)] = &Volume{}
+ }
+ return location
+}
+
+func TestCollectHeartbeatRespectsLowDiskSpace(t *testing.T) {
+ diskType := types.ToDiskType("hdd")
+
+ t.Run("low disk space", func(t *testing.T) {
+ location := newTestLocation(10, true, 3)
+ store := &Store{Locations: []*DiskLocation{location}}
+
+ hb := store.CollectHeartbeat()
+ if got := hb.MaxVolumeCounts[string(diskType)]; got != 3 {
+ t.Errorf("expected low disk space to cap max volume count to used slots, got %d", got)
+ }
+ })
+
+ t.Run("normal disk space", func(t *testing.T) {
+ location := newTestLocation(10, false, 3)
+ store := &Store{Locations: []*DiskLocation{location}}
+
+ hb := store.CollectHeartbeat()
+ if got := hb.MaxVolumeCounts[string(diskType)]; got != 10 {
+ t.Errorf("expected normal disk space to report configured max volume count, got %d", got)
+ }
+ })
+
+ t.Run("low disk space zero volumes", func(t *testing.T) {
+ location := newTestLocation(10, true, 0)
+ store := &Store{Locations: []*DiskLocation{location}}
+
+ hb := store.CollectHeartbeat()
+ if got := hb.MaxVolumeCounts[string(diskType)]; got != 0 {
+ t.Errorf("expected zero volumes to report zero capacity, got %d", got)
+ }
+ })
+
+ t.Run("low disk space with ec shards", func(t *testing.T) {
+ location := newTestLocation(10, true, 3)
+
+ ecVolume := &erasure_coding.EcVolume{VolumeId: 1}
+ const shardCount = 15
+ for i := 0; i < shardCount; i++ {
+ ecVolume.Shards = append(ecVolume.Shards, &erasure_coding.EcVolumeShard{
+ ShardId: erasure_coding.ShardId(i),
+ })
+ }
+ location.ecVolumes[ecVolume.VolumeId] = ecVolume
+ store := &Store{Locations: []*DiskLocation{location}}
+
+ hb := store.CollectHeartbeat()
+ expectedSlots := len(location.volumes) + (shardCount+erasure_coding.DataShardsCount-1)/erasure_coding.DataShardsCount
+ if got := hb.MaxVolumeCounts[string(diskType)]; got != uint32(expectedSlots) {
+ t.Errorf("expected low disk space to include ec shard contribution, got %d want %d", got, expectedSlots)
+ }
+ })
+
+ t.Run("low disk space with multiple ec volumes", func(t *testing.T) {
+ location := newTestLocation(10, true, 2)
+
+ totalShardCount := 0
+
+ addEcVolume := func(vid needle.VolumeId, shardCount int) {
+ ecVolume := &erasure_coding.EcVolume{VolumeId: vid}
+ for i := 0; i < shardCount; i++ {
+ ecVolume.Shards = append(ecVolume.Shards, &erasure_coding.EcVolumeShard{
+ ShardId: erasure_coding.ShardId(i),
+ })
+ }
+ location.ecVolumes[vid] = ecVolume
+ totalShardCount += shardCount
+ }
+
+ addEcVolume(1, 12)
+ addEcVolume(2, 6)
+
+ store := &Store{Locations: []*DiskLocation{location}}
+
+ hb := store.CollectHeartbeat()
+ expectedSlots := len(location.volumes)
+ expectedSlots += (totalShardCount + erasure_coding.DataShardsCount - 1) / erasure_coding.DataShardsCount
+
+ if got := hb.MaxVolumeCounts[string(diskType)]; got != uint32(expectedSlots) {
+ t.Errorf("expected multiple ec volumes to be counted, got %d want %d", got, expectedSlots)
+ }
+ })
+}