aboutsummaryrefslogtreecommitdiff
path: root/weed/storage
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage')
-rw-r--r--weed/storage/store.go28
-rw-r--r--weed/storage/volume_checking.go8
-rw-r--r--weed/storage/volume_loading.go2
3 files changed, 25 insertions, 13 deletions
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 2d9ecbe32..e34f0d79a 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -57,7 +57,8 @@ type ReadOption struct {
type Store struct {
MasterAddress pb.ServerAddress
grpcDialOption grpc.DialOption
- volumeSizeLimit uint64 // read from the master
+ volumeSizeLimit uint64 // read from the master
+ preallocate atomic.Bool // read from the master
Ip string
Port int
GrpcPort int
@@ -470,14 +471,16 @@ func (s *Store) HasVolume(i needle.VolumeId) bool {
return v != nil
}
-func (s *Store) MarkVolumeReadonly(i needle.VolumeId) error {
+func (s *Store) MarkVolumeReadonly(i needle.VolumeId, persist bool) error {
v := s.findVolume(i)
if v == nil {
return fmt.Errorf("volume %d not found", i)
}
v.noWriteLock.Lock()
v.noWriteOrDelete = true
- v.PersistReadOnly(true)
+ if persist {
+ v.PersistReadOnly(true)
+ }
v.noWriteLock.Unlock()
return nil
}
@@ -607,6 +610,14 @@ func (s *Store) GetVolumeSizeLimit() uint64 {
return atomic.LoadUint64(&s.volumeSizeLimit)
}
+func (s *Store) SetPreallocate(x bool) {
+ s.preallocate.Store(x)
+}
+
+func (s *Store) GetPreallocate() bool {
+ return s.preallocate.Load()
+}
+
func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) {
volumeSizeLimit := s.GetVolumeSizeLimit()
if volumeSizeLimit == 0 {
@@ -617,10 +628,15 @@ func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) {
if diskLocation.OriginalMaxVolumeCount == 0 {
currentMaxVolumeCount := atomic.LoadInt32(&diskLocation.MaxVolumeCount)
diskStatus := stats.NewDiskStatus(diskLocation.Directory)
- unusedSpace := diskLocation.UnUsedSpace(volumeSizeLimit)
- unclaimedSpaces := int64(diskStatus.Free) - int64(unusedSpace)
+ var unusedSpace uint64 = 0
+ unclaimedSpaces := int64(diskStatus.Free)
+ if !s.GetPreallocate() {
+ unusedSpace = diskLocation.UnUsedSpace(volumeSizeLimit)
+ unclaimedSpaces -= int64(unusedSpace)
+ }
volCount := diskLocation.VolumesLen()
- maxVolumeCount := int32(volCount)
+ ecShardCount := diskLocation.EcShardCount()
+ maxVolumeCount := int32(volCount) + int32((ecShardCount+erasure_coding.DataShardsCount)/erasure_coding.DataShardsCount)
if unclaimedSpaces > int64(volumeSizeLimit) {
maxVolumeCount += int32(uint64(unclaimedSpaces)/volumeSizeLimit) - 1
}
diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go
index f5ceffcce..9bd432f85 100644
--- a/weed/storage/volume_checking.go
+++ b/weed/storage/volume_checking.go
@@ -14,7 +14,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
)
-func CheckAndFixVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, err error) {
+func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, err error) {
var indexSize int64
if indexSize, err = verifyIndexFileIntegrity(indexFile); err != nil {
return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), err)
@@ -35,11 +35,7 @@ func CheckAndFixVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAt
}
}
if healthyIndexSize < indexSize {
- glog.Warningf("CheckAndFixVolumeDataIntegrity truncate idx file %s from %d to %d", indexFile.Name(), indexSize, healthyIndexSize)
- err = indexFile.Truncate(healthyIndexSize)
- if err != nil {
- glog.Warningf("CheckAndFixVolumeDataIntegrity truncate idx file %s from %d to %d: %v", indexFile.Name(), indexSize, healthyIndexSize, err)
- }
+ return 0, fmt.Errorf("CheckVolumeDataIntegrity %s failed: index size %d differs from healthy size %d", indexFile.Name(), indexSize, healthyIndexSize)
}
return
}
diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go
index 7064fdc03..919b6e2a0 100644
--- a/weed/storage/volume_loading.go
+++ b/weed/storage/volume_loading.go
@@ -135,7 +135,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
// capactiy overloading.
if !v.HasRemoteFile() {
glog.V(0).Infof("checking volume data integrity for volume %d", v.Id)
- if v.lastAppendAtNs, err = CheckAndFixVolumeDataIntegrity(v, indexFile); err != nil {
+ if v.lastAppendAtNs, err = CheckVolumeDataIntegrity(v, indexFile); err != nil {
v.noWriteOrDelete = true
glog.V(0).Infof("volumeDataIntegrityChecking failed %v", err)
}