aboutsummaryrefslogtreecommitdiff
path: root/weed/storage
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage')
-rw-r--r--weed/storage/disk_location.go10
-rw-r--r--weed/storage/store.go3
-rw-r--r--weed/storage/volume.go15
-rw-r--r--weed/storage/volume_vacuum.go11
-rw-r--r--weed/storage/volume_vacuum_test.go7
-rw-r--r--weed/storage/volume_write_test.go7
6 files changed, 36 insertions, 17 deletions
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index af4ec1eb4..d618db296 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -317,6 +317,16 @@ func (l *DiskLocation) VolumesLen() int {
return len(l.volumes)
}
+func (l *DiskLocation) SetStopping() {
+ l.volumesLock.Lock()
+ for _, v := range l.volumes {
+ v.SetStopping()
+ }
+ l.volumesLock.Unlock()
+
+ return
+}
+
func (l *DiskLocation) Close() {
l.volumesLock.Lock()
for _, v := range l.volumes {
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 8381705d6..30fe63b63 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -327,6 +327,9 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
func (s *Store) SetStopping() {
s.isStopping = true
+ for _, location := range s.Locations {
+ location.SetStopping()
+ }
}
func (s *Store) Close() {
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index c6bf3e329..14bc5f22d 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -175,6 +175,21 @@ func (v *Volume) DiskType() types.DiskType {
return v.location.DiskType
}
+func (v *Volume) SetStopping() {
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+ if v.nm != nil {
+ if err := v.nm.Sync(); err != nil {
+ glog.Warningf("Volume SetStopping fail to sync volume idx %d", v.Id)
+ }
+ }
+ if v.DataBackend != nil {
+ if err := v.DataBackend.Sync(); err != nil {
+ glog.Warningf("Volume SetStopping fail to sync volume %d", v.Id)
+ }
+ }
+}
+
// Close cleanly shuts down this volume
func (v *Volume) Close() {
v.dataFileAccessLock.Lock()
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index 3bda260d9..be7f6891f 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -389,8 +389,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
return err
}
- err = nm.SaveToIdx(idxName)
- return nil
+ return nm.SaveToIdx(idxName)
}
func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate, compactionBytePerSecond int64, progressFn ProgressFunc) (err error) {
@@ -424,7 +423,7 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str
writeThrottler := util.NewWriteThrottler(compactionBytePerSecond)
- oldNm.AscendingVisit(func(value needle_map.NeedleValue) error {
+ err = oldNm.AscendingVisit(func(value needle_map.NeedleValue) error {
offset, size := value.Offset, value.Size
@@ -461,8 +460,10 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str
return nil
})
+ if err != nil {
+ return err
+ }
- newNm.SaveToIdx(datIdxName)
+ return newNm.SaveToIdx(datIdxName)
- return nil
}
diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go
index 0177cb64d..8212d86c7 100644
--- a/weed/storage/volume_vacuum_test.go
+++ b/weed/storage/volume_vacuum_test.go
@@ -2,7 +2,6 @@ package storage
import (
"math/rand"
- "os"
"testing"
"time"
@@ -62,11 +61,7 @@ func TestMakeDiff(t *testing.T) {
}
func TestCompaction(t *testing.T) {
- dir, err := os.MkdirTemp("", "example")
- if err != nil {
- t.Fatalf("temp dir creation: %v", err)
- }
- defer os.RemoveAll(dir) // clean up
+ dir := t.TempDir()
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0)
if err != nil {
diff --git a/weed/storage/volume_write_test.go b/weed/storage/volume_write_test.go
index 9f661a27f..11fe49358 100644
--- a/weed/storage/volume_write_test.go
+++ b/weed/storage/volume_write_test.go
@@ -2,7 +2,6 @@ package storage
import (
"fmt"
- "os"
"testing"
"time"
@@ -12,11 +11,7 @@ import (
)
func TestSearchVolumesWithDeletedNeedles(t *testing.T) {
- dir, err := os.MkdirTemp("", "example")
- if err != nil {
- t.Fatalf("temp dir creation: %v", err)
- }
- defer os.RemoveAll(dir) // clean up
+ dir := t.TempDir()
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0)
if err != nil {