aboutsummaryrefslogtreecommitdiff
path: root/weed/storage
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage')
-rw-r--r--weed/storage/backend/disk_file.go5
-rw-r--r--weed/storage/disk_location.go10
-rw-r--r--weed/storage/erasure_coding/ec_encoder.go2
-rw-r--r--weed/storage/store.go3
-rw-r--r--weed/storage/volume.go15
-rw-r--r--weed/storage/volume_vacuum.go21
-rw-r--r--weed/storage/volume_vacuum_test.go7
-rw-r--r--weed/storage/volume_write_test.go7
8 files changed, 46 insertions, 24 deletions
diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go
index 3b42429cf..cd5207356 100644
--- a/weed/storage/backend/disk_file.go
+++ b/weed/storage/backend/disk_file.go
@@ -70,7 +70,10 @@ func (df *DiskFile) Close() error {
}
func (df *DiskFile) GetStat() (datSize int64, modTime time.Time, err error) {
- return df.fileSize, df.modTime, nil
+ if df.File == nil {
+ err = os.ErrInvalid
+ }
+ return df.fileSize, df.modTime, err
}
func (df *DiskFile) Name() string {
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index af4ec1eb4..d618db296 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -317,6 +317,16 @@ func (l *DiskLocation) VolumesLen() int {
return len(l.volumes)
}
+func (l *DiskLocation) SetStopping() {
+ l.volumesLock.Lock()
+ for _, v := range l.volumes {
+ v.SetStopping()
+ }
+ l.volumesLock.Unlock()
+
+ return
+}
+
func (l *DiskLocation) Close() {
l.volumesLock.Lock()
for _, v := range l.volumes {
diff --git a/weed/storage/erasure_coding/ec_encoder.go b/weed/storage/erasure_coding/ec_encoder.go
index 34b639407..157149865 100644
--- a/weed/storage/erasure_coding/ec_encoder.go
+++ b/weed/storage/erasure_coding/ec_encoder.go
@@ -220,7 +220,7 @@ func encodeDatFile(remainingSize int64, err error, baseFileName string, bufferSi
processedSize += largeBlockSize * DataShardsCount
}
for remainingSize > 0 {
- encodeData(file, enc, processedSize, smallBlockSize, buffers, outputs)
+ err = encodeData(file, enc, processedSize, smallBlockSize, buffers, outputs)
if err != nil {
return fmt.Errorf("failed to encode small chunk data: %v", err)
}
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 8381705d6..30fe63b63 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -327,6 +327,9 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
func (s *Store) SetStopping() {
s.isStopping = true
+ for _, location := range s.Locations {
+ location.SetStopping()
+ }
}
func (s *Store) Close() {
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index c6bf3e329..14bc5f22d 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -175,6 +175,21 @@ func (v *Volume) DiskType() types.DiskType {
return v.location.DiskType
}
+func (v *Volume) SetStopping() {
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+ if v.nm != nil {
+ if err := v.nm.Sync(); err != nil {
+ glog.Warningf("Volume SetStopping fail to sync volume idx %d", v.Id)
+ }
+ }
+ if v.DataBackend != nil {
+ if err := v.DataBackend.Sync(); err != nil {
+ glog.Warningf("Volume SetStopping fail to sync volume %d", v.Id)
+ }
+ }
+}
+
// Close cleanly shuts down this volume
func (v *Volume) Close() {
v.dataFileAccessLock.Lock()
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index 56e8beddb..06de181b5 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -370,7 +370,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
dst backend.BackendStorageFile
)
if dst, err = backend.CreateVolumeFile(dstName, preallocate, 0); err != nil {
- return
+ return err
}
defer dst.Close()
@@ -386,11 +386,10 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
}
err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner)
if err != nil {
- return nil
+ return err
}
- err = nm.SaveToIdx(idxName)
- return
+ return nm.SaveToIdx(idxName)
}
func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate, compactionBytePerSecond int64, progressFn ProgressFunc) (err error) {
@@ -399,7 +398,7 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str
dataFile *os.File
)
if dstDatBackend, err = backend.CreateVolumeFile(dstDatName, preallocate, 0); err != nil {
- return
+ return err
}
defer dstDatBackend.Close()
@@ -408,7 +407,7 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str
newNm := needle_map.NewMemDb()
defer newNm.Close()
if err = oldNm.LoadFromIdx(srcIdxName); err != nil {
- return
+ return err
}
if dataFile, err = os.Open(srcDatName); err != nil {
return err
@@ -424,7 +423,7 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str
writeThrottler := util.NewWriteThrottler(compactionBytePerSecond)
- oldNm.AscendingVisit(func(value needle_map.NeedleValue) error {
+ err = oldNm.AscendingVisit(func(value needle_map.NeedleValue) error {
offset, size := value.Offset, value.Size
@@ -441,7 +440,7 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str
n := new(needle.Needle)
err := n.ReadData(srcDatBackend, offset.ToActualOffset(), size, version)
if err != nil {
- return nil
+ return fmt.Errorf("cannot hydrate needle from file: %s", err)
}
if n.HasTtl() && now >= n.LastModified+uint64(sb.Ttl.Minutes()*60) {
@@ -461,8 +460,10 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str
return nil
})
+ if err != nil {
+ return err
+ }
- newNm.SaveToIdx(datIdxName)
+ return newNm.SaveToIdx(datIdxName)
- return
}
diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go
index 0177cb64d..8212d86c7 100644
--- a/weed/storage/volume_vacuum_test.go
+++ b/weed/storage/volume_vacuum_test.go
@@ -2,7 +2,6 @@ package storage
import (
"math/rand"
- "os"
"testing"
"time"
@@ -62,11 +61,7 @@ func TestMakeDiff(t *testing.T) {
}
func TestCompaction(t *testing.T) {
- dir, err := os.MkdirTemp("", "example")
- if err != nil {
- t.Fatalf("temp dir creation: %v", err)
- }
- defer os.RemoveAll(dir) // clean up
+ dir := t.TempDir()
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0)
if err != nil {
diff --git a/weed/storage/volume_write_test.go b/weed/storage/volume_write_test.go
index 9f661a27f..11fe49358 100644
--- a/weed/storage/volume_write_test.go
+++ b/weed/storage/volume_write_test.go
@@ -2,7 +2,6 @@ package storage
import (
"fmt"
- "os"
"testing"
"time"
@@ -12,11 +11,7 @@ import (
)
func TestSearchVolumesWithDeletedNeedles(t *testing.T) {
- dir, err := os.MkdirTemp("", "example")
- if err != nil {
- t.Fatalf("temp dir creation: %v", err)
- }
- defer os.RemoveAll(dir) // clean up
+ dir := t.TempDir()
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0)
if err != nil {