aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-03-22 16:21:42 -0700
committerChris Lu <chris.lu@gmail.com>2020-03-22 16:21:42 -0700
commit3137777d8395111f6c1eb4b3653e13f4961b8510 (patch)
tree14cfd08d09cd27c5886dd053a9af62b74f1188c5
parent0bf148f49d63a834000f7bce63df2c0f4d78fa19 (diff)
downloadseaweedfs-3137777d8395111f6c1eb4b3653e13f4961b8510.tar.xz
seaweedfs-3137777d8395111f6c1eb4b3653e13f4961b8510.zip
volume: automatically detect max volume count
-rw-r--r--weed/server/master_grpc_server.go5
-rw-r--r--weed/server/volume_grpc_client_to_master.go7
-rw-r--r--weed/storage/disk_location.go16
-rw-r--r--weed/storage/disk_location_ec.go7
-rw-r--r--weed/storage/erasure_coding/ec_volume.go7
-rw-r--r--weed/storage/store.go25
6 files changed, 66 insertions, 1 deletions
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 84087df8b..cfe5fd9c0 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -81,6 +81,11 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
}
+ if dn.GetMaxVolumeCount() != int64(heartbeat.MaxVolumeCount) {
+ delta := int64(heartbeat.MaxVolumeCount) - dn.GetMaxVolumeCount()
+ dn.UpAdjustMaxVolumeCountDelta(delta)
+ }
+
glog.V(4).Infof("master received heartbeat %s", heartbeat.String())
message := &master_pb.VolumeLocation{
Url: dn.Url(),
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index 1f4d9df10..517eb4bc0 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -80,8 +80,13 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
doneChan <- err
return
}
- if in.GetVolumeSizeLimit() != 0 {
+ if in.GetVolumeSizeLimit() != 0 && vs.store.GetVolumeSizeLimit() != in.GetVolumeSizeLimit() {
vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit())
+ if vs.store.MaybeAdjustVolumeMax() {
+ if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
+ glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
+ }
+ }
}
if in.GetLeader() != "" && masterNode != in.GetLeader() && !isSameIP(in.GetLeader(), masterNode) {
glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), masterNode)
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index f15303282..3c8a7b864 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -275,3 +275,19 @@ func (l *DiskLocation) LocateVolume(vid needle.VolumeId) (os.FileInfo, bool) {
return nil, false
}
+
+func (l *DiskLocation) UnUsedSpace(volumeSizeLimit uint64) (unUsedSpace uint64) {
+
+ l.volumesLock.RLock()
+ defer l.volumesLock.RUnlock()
+
+ for _, vol := range l.volumes {
+ if vol.IsReadOnly() {
+ continue
+ }
+ datSize, idxSize, _ := vol.FileStat()
+ unUsedSpace += volumeSizeLimit - (datSize + idxSize)
+ }
+
+ return
+}
diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go
index f6c44e966..72d3e2b3e 100644
--- a/weed/storage/disk_location_ec.go
+++ b/weed/storage/disk_location_ec.go
@@ -183,3 +183,10 @@ func (l *DiskLocation) unmountEcVolumeByCollection(collectionName string) map[ne
}
return deltaVols
}
+
+func (l *DiskLocation) EcVolumesLen() int {
+ l.ecVolumesLock.RLock()
+ defer l.ecVolumesLock.RUnlock()
+
+ return len(l.ecVolumes)
+}
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go
index 3d9aa2cff..eef53765f 100644
--- a/weed/storage/erasure_coding/ec_volume.go
+++ b/weed/storage/erasure_coding/ec_volume.go
@@ -152,6 +152,13 @@ func (ev *EcVolume) ShardSize() int64 {
return 0
}
+func (ev *EcVolume) Size() (size int64) {
+ for _, shard := range ev.Shards {
+ size += shard.Size()
+ }
+ return
+}
+
func (ev *EcVolume) CreatedAt() time.Time {
return ev.ecxCreatedAt
}
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 76fe4de27..4ef3682d8 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -12,6 +12,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
@@ -99,6 +100,9 @@ func (s *Store) FindFreeLocation() (ret *DiskLocation) {
max := 0
for _, location := range s.Locations {
currentFreeCount := location.MaxVolumeCount - location.VolumesLen()
+ currentFreeCount *= erasure_coding.DataShardsCount
+ currentFreeCount -= location.EcVolumesLen()
+ currentFreeCount /= erasure_coding.DataShardsCount
if currentFreeCount > max {
max = currentFreeCount
ret = location
@@ -382,3 +386,24 @@ func (s *Store) SetVolumeSizeLimit(x uint64) {
func (s *Store) GetVolumeSizeLimit() uint64 {
return atomic.LoadUint64(&s.volumeSizeLimit)
}
+
+func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) {
+ volumeSizeLimit := s.GetVolumeSizeLimit()
+ for _, diskLocation := range s.Locations {
+ if diskLocation.MaxVolumeCount == 0 {
+ diskStatus := stats.NewDiskStatus(diskLocation.Directory)
+ unusedSpace := diskLocation.UnUsedSpace(volumeSizeLimit)
+ unclaimedSpaces := int64(diskStatus.Free) - int64(unusedSpace)
+ volCount := diskLocation.VolumesLen()
+ maxVolumeCount := volCount
+ if unclaimedSpaces > int64(volumeSizeLimit) {
+ maxVolumeCount += int(uint64(unclaimedSpaces)/volumeSizeLimit) - 1
+ }
+ diskLocation.MaxVolumeCount = maxVolumeCount
+ glog.V(0).Infof("disk %s max %d unclaimedSpace:%dMB, unused:%dMB volumeSizeLimit:%d/MB",
+ diskLocation.Directory, maxVolumeCount, unclaimedSpaces/1024/1024, unusedSpace/1024/1024, volumeSizeLimit/1024/1024)
+ hasChanges = true
+ }
+ }
+ return
+}