aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/volume_vacuum.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/volume_vacuum.go')
-rw-r--r--weed/storage/volume_vacuum.go43
1 files changed, 24 insertions, 19 deletions
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index 0e5130572..e90746b54 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -7,6 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
idx2 "github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
@@ -76,10 +77,10 @@ func (v *Volume) CommitCompact() error {
glog.V(3).Infof("Got volume %d committing lock...", v.Id)
v.nm.Close()
- if err := v.dataFile.Close(); err != nil {
+ if err := v.DataBackend.Close(); err != nil {
glog.V(0).Infof("fail to close volume %d", v.Id)
}
- v.dataFile = nil
+ v.DataBackend = nil
stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec()
var e error
@@ -131,8 +132,8 @@ func (v *Volume) cleanupCompact() error {
return nil
}
-func fetchCompactRevisionFromDatFile(file *os.File) (compactRevision uint16, err error) {
- superBlock, err := ReadSuperBlock(file)
+func fetchCompactRevisionFromDatFile(datBackend backend.DataStorageBackend) (compactRevision uint16, err error) {
+ superBlock, err := ReadSuperBlock(datBackend)
if err != nil {
return 0, err
}
@@ -146,7 +147,8 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
defer oldIdxFile.Close()
oldDatFile, err := os.Open(oldDatFileName)
- defer oldDatFile.Close()
+ oldDatBackend := backend.NewDiskFile(oldDatFile)
+ defer oldDatBackend.Close()
if indexSize, err = verifyIndexFileIntegrity(oldIdxFile); err != nil {
return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err)
@@ -155,7 +157,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
return nil
}
- oldDatCompactRevision, err := fetchCompactRevisionFromDatFile(oldDatFile)
+ oldDatCompactRevision, err := fetchCompactRevisionFromDatFile(oldDatBackend)
if err != nil {
return fmt.Errorf("fetchCompactRevisionFromDatFile src %s failed: %v", oldDatFile.Name(), err)
}
@@ -196,7 +198,8 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
if dst, err = os.OpenFile(newDatFileName, os.O_RDWR, 0644); err != nil {
return fmt.Errorf("open dat file %s failed: %v", newDatFileName, err)
}
- defer dst.Close()
+ dstDatBackend := backend.NewDiskFile(dst)
+ defer dstDatBackend.Close()
if idx, err = os.OpenFile(newIdxFileName, os.O_RDWR, 0644); err != nil {
return fmt.Errorf("open idx file %s failed: %v", newIdxFileName, err)
@@ -204,7 +207,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
defer idx.Close()
var newDatCompactRevision uint16
- newDatCompactRevision, err = fetchCompactRevisionFromDatFile(dst)
+ newDatCompactRevision, err = fetchCompactRevisionFromDatFile(dstDatBackend)
if err != nil {
return fmt.Errorf("fetchCompactRevisionFromDatFile dst %s failed: %v", dst.Name(), err)
}
@@ -235,7 +238,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
//even the needle cache in memory is hit, the need_bytes is correct
glog.V(4).Infof("file %d offset %d size %d", key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size)
var needleBytes []byte
- needleBytes, err = needle.ReadNeedleBlob(oldDatFile, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, v.Version())
+ needleBytes, err = needle.ReadNeedleBlob(oldDatBackend, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, v.Version())
if err != nil {
return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, err)
}
@@ -247,7 +250,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
fakeDelNeedle.Id = key
fakeDelNeedle.Cookie = 0x12345678
fakeDelNeedle.AppendAtNs = uint64(time.Now().UnixNano())
- _, _, _, err = fakeDelNeedle.Append(dst, v.Version())
+ _, _, _, err = fakeDelNeedle.Append(dstDatBackend, v.Version())
if err != nil {
return fmt.Errorf("append deleted %d failed: %v", key, err)
}
@@ -267,7 +270,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
type VolumeFileScanner4Vacuum struct {
version needle.Version
v *Volume
- dst *os.File
+ dstBackend backend.DataStorageBackend
nm *NeedleMap
newOffset int64
now uint64
@@ -277,7 +280,7 @@ type VolumeFileScanner4Vacuum struct {
func (scanner *VolumeFileScanner4Vacuum) VisitSuperBlock(superBlock SuperBlock) error {
scanner.version = superBlock.Version()
superBlock.CompactionRevision++
- _, err := scanner.dst.Write(superBlock.Bytes())
+ _, err := scanner.dstBackend.WriteAt(superBlock.Bytes(), 0)
scanner.newOffset = int64(superBlock.BlockSize())
return err
@@ -296,7 +299,7 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
if err := scanner.nm.Put(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err)
}
- if _, _, _, err := n.Append(scanner.dst, scanner.v.Version()); err != nil {
+ if _, _, _, err := n.Append(scanner.dstBackend, scanner.v.Version()); err != nil {
return fmt.Errorf("cannot append needle: %s", err)
}
delta := n.DiskSize(scanner.version)
@@ -309,9 +312,10 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) {
var (
- dst, idx *os.File
+ dst backend.DataStorageBackend
+ idx *os.File
)
- if dst, err = createVolumeFile(dstName, preallocate, v.MemoryMapMaxSizeMb); err != nil {
+ if dst, err = createVolumeFile(dstName, preallocate, 0); err != nil {
return
}
defer dst.Close()
@@ -325,7 +329,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
v: v,
now: uint64(time.Now().Unix()),
nm: NewBtreeNeedleMap(idx),
- dst: dst,
+ dstBackend: dst,
writeThrottler: util.NewWriteThrottler(compactionBytePerSecond),
}
err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner)
@@ -339,7 +343,8 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
return
}
- defer dst.Close()
+ dstDatBackend := backend.NewDiskFile(dst)
+ defer dstDatBackend.Close()
if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
return
@@ -369,7 +374,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
}
n := new(needle.Needle)
- err := n.ReadData(v.dataFile, offset.ToAcutalOffset(), size, v.Version())
+ err := n.ReadData(v.DataBackend, offset.ToAcutalOffset(), size, v.Version())
if err != nil {
return nil
}
@@ -383,7 +388,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
if err = nm.Put(n.Id, ToOffset(newOffset), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err)
}
- if _, _, _, err = n.Append(dst, v.Version()); err != nil {
+ if _, _, _, err = n.Append(dstDatBackend, v.Version()); err != nil {
return fmt.Errorf("cannot append needle: %s", err)
}
newOffset += n.DiskSize(v.Version())