diff options
Diffstat (limited to 'weed/storage')
| -rw-r--r-- | weed/storage/backend/disk_file.go | 5 | ||||
| -rw-r--r-- | weed/storage/disk_location.go | 10 | ||||
| -rw-r--r-- | weed/storage/erasure_coding/ec_encoder.go | 2 | ||||
| -rw-r--r-- | weed/storage/store.go | 3 | ||||
| -rw-r--r-- | weed/storage/volume.go | 15 | ||||
| -rw-r--r-- | weed/storage/volume_vacuum.go | 21 | ||||
| -rw-r--r-- | weed/storage/volume_vacuum_test.go | 7 | ||||
| -rw-r--r-- | weed/storage/volume_write_test.go | 7 |
8 files changed, 46 insertions, 24 deletions
diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go index 3b42429cf..cd5207356 100644 --- a/weed/storage/backend/disk_file.go +++ b/weed/storage/backend/disk_file.go @@ -70,7 +70,10 @@ func (df *DiskFile) Close() error { } func (df *DiskFile) GetStat() (datSize int64, modTime time.Time, err error) { - return df.fileSize, df.modTime, nil + if df.File == nil { + err = os.ErrInvalid + } + return df.fileSize, df.modTime, err } func (df *DiskFile) Name() string { diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index af4ec1eb4..d618db296 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -317,6 +317,16 @@ func (l *DiskLocation) VolumesLen() int { return len(l.volumes) } +func (l *DiskLocation) SetStopping() { + l.volumesLock.Lock() + for _, v := range l.volumes { + v.SetStopping() + } + l.volumesLock.Unlock() + + return +} + func (l *DiskLocation) Close() { l.volumesLock.Lock() for _, v := range l.volumes { diff --git a/weed/storage/erasure_coding/ec_encoder.go b/weed/storage/erasure_coding/ec_encoder.go index 34b639407..157149865 100644 --- a/weed/storage/erasure_coding/ec_encoder.go +++ b/weed/storage/erasure_coding/ec_encoder.go @@ -220,7 +220,7 @@ func encodeDatFile(remainingSize int64, err error, baseFileName string, bufferSi processedSize += largeBlockSize * DataShardsCount } for remainingSize > 0 { - encodeData(file, enc, processedSize, smallBlockSize, buffers, outputs) + err = encodeData(file, enc, processedSize, smallBlockSize, buffers, outputs) if err != nil { return fmt.Errorf("failed to encode small chunk data: %v", err) } diff --git a/weed/storage/store.go b/weed/storage/store.go index 8381705d6..30fe63b63 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -327,6 +327,9 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { func (s *Store) SetStopping() { s.isStopping = true + for _, location := range s.Locations { + location.SetStopping() + } } func (s *Store) Close() { diff --git a/weed/storage/volume.go b/weed/storage/volume.go index c6bf3e329..14bc5f22d 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -175,6 +175,21 @@ func (v *Volume) DiskType() types.DiskType { return v.location.DiskType } +func (v *Volume) SetStopping() { + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + if v.nm != nil { + if err := v.nm.Sync(); err != nil { + glog.Warningf("Volume SetStopping fail to sync volume idx %d", v.Id) + } + } + if v.DataBackend != nil { + if err := v.DataBackend.Sync(); err != nil { + glog.Warningf("Volume SetStopping fail to sync volume %d", v.Id) + } + } +} + // Close cleanly shuts down this volume func (v *Volume) Close() { v.dataFileAccessLock.Lock() diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 56e8beddb..06de181b5 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -370,7 +370,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca dst backend.BackendStorageFile ) if dst, err = backend.CreateVolumeFile(dstName, preallocate, 0); err != nil { - return + return err } defer dst.Close() @@ -386,11 +386,10 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca } err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner) if err != nil { - return nil + return err } - err = nm.SaveToIdx(idxName) - return + return nm.SaveToIdx(idxName) } func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate, compactionBytePerSecond int64, progressFn ProgressFunc) (err error) { @@ -399,7 +398,7 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str dataFile *os.File ) if dstDatBackend, err = backend.CreateVolumeFile(dstDatName, preallocate, 0); err != nil { - return + return err } defer dstDatBackend.Close() @@ -408,7 +407,7 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str newNm := needle_map.NewMemDb() defer newNm.Close() if err = oldNm.LoadFromIdx(srcIdxName); err != nil { - return + return err } if dataFile, err = os.Open(srcDatName); err != nil { return err @@ -424,7 +423,7 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str writeThrottler := util.NewWriteThrottler(compactionBytePerSecond) - oldNm.AscendingVisit(func(value needle_map.NeedleValue) error { + err = oldNm.AscendingVisit(func(value needle_map.NeedleValue) error { offset, size := value.Offset, value.Size @@ -441,7 +440,7 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str n := new(needle.Needle) err := n.ReadData(srcDatBackend, offset.ToActualOffset(), size, version) if err != nil { - return nil + return fmt.Errorf("cannot hydrate needle from file: %s", err) } if n.HasTtl() && now >= n.LastModified+uint64(sb.Ttl.Minutes()*60) { @@ -461,8 +460,10 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str return nil }) + if err != nil { + return err + } - newNm.SaveToIdx(datIdxName) + return newNm.SaveToIdx(datIdxName) - return } diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go index 0177cb64d..8212d86c7 100644 --- a/weed/storage/volume_vacuum_test.go +++ b/weed/storage/volume_vacuum_test.go @@ -2,7 +2,6 @@ package storage import ( "math/rand" - "os" "testing" "time" @@ -62,11 +61,7 @@ func TestMakeDiff(t *testing.T) { } func TestCompaction(t *testing.T) { - dir, err := os.MkdirTemp("", "example") - if err != nil { - t.Fatalf("temp dir creation: %v", err) - } - defer os.RemoveAll(dir) // clean up + dir := t.TempDir() v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0) if err != nil { diff --git a/weed/storage/volume_write_test.go b/weed/storage/volume_write_test.go index 9f661a27f..11fe49358 100644 --- a/weed/storage/volume_write_test.go +++ b/weed/storage/volume_write_test.go @@ -2,7 +2,6 @@ package storage import ( "fmt" - "os" "testing" "time" @@ -12,11 +11,7 @@ import ( ) func TestSearchVolumesWithDeletedNeedles(t *testing.T) { - dir, err := os.MkdirTemp("", "example") - if err != nil { - t.Fatalf("temp dir creation: %v", err) - } - defer os.RemoveAll(dir) // clean up + dir := t.TempDir() v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0) if err != nil { |
