aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/volume_vacuum.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/volume_vacuum.go')
-rw-r--r--weed/storage/volume_vacuum.go32
1 files changed, 16 insertions, 16 deletions
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index b3dcdbd9d..704f1f4ef 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -115,6 +115,7 @@ func (v *Volume) CommitCompact() error {
if e = v.load(true, false, v.needleMapKind, 0); e != nil {
return e
}
+ return nil
}
func (v *Volume) cleanupCompact() error {
@@ -270,7 +271,7 @@ type VolumeFileScanner4Vacuum struct {
version needle.Version
v *Volume
dstBackend backend.BackendStorageFile
- nm *NeedleMap
+ nm *needle_map.MemDb
newOffset int64
now uint64
writeThrottler *util.WriteThrottler
@@ -295,7 +296,7 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
nv, ok := scanner.v.nm.Get(n.Id)
glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
if ok && nv.Offset.ToAcutalOffset() == offset && nv.Size > 0 && nv.Size != TombstoneFileSize {
- if err := scanner.nm.Put(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil {
+ if err := scanner.nm.Set(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err)
}
if _, _, _, err := n.Append(scanner.dstBackend, scanner.v.Version()); err != nil {
@@ -312,32 +313,33 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) {
var (
dst backend.BackendStorageFile
- idx *os.File
)
if dst, err = createVolumeFile(dstName, preallocate, 0); err != nil {
return
}
defer dst.Close()
- if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
- return
- }
- defer idx.Close()
+ nm := needle_map.NewMemDb()
scanner := &VolumeFileScanner4Vacuum{
v: v,
now: uint64(time.Now().Unix()),
- nm: NewBtreeNeedleMap(idx),
+ nm: nm,
dstBackend: dst,
writeThrottler: util.NewWriteThrottler(compactionBytePerSecond),
}
err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner)
+ if err != nil {
+ return nil
+ }
+
+ err = nm.SaveToIdx(idxName)
return
}
func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
var (
- dst, idx, oldIndexFile *os.File
+ dst, oldIndexFile *os.File
)
if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
return
@@ -345,17 +347,13 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
dstDatBackend := backend.NewDiskFile(dst)
defer dstDatBackend.Close()
- if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
- return
- }
- defer idx.Close()
-
if oldIndexFile, err = os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644); err != nil {
return
}
defer oldIndexFile.Close()
- nm := NewBtreeNeedleMap(idx)
+ nm := needle_map.NewMemDb()
+
now := uint64(time.Now().Unix())
v.SuperBlock.CompactionRevision++
@@ -384,7 +382,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
if nv.Offset == offset && nv.Size > 0 {
- if err = nm.Put(n.Id, ToOffset(newOffset), n.Size); err != nil {
+ if err = nm.Set(n.Id, ToOffset(newOffset), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err)
}
if _, _, _, err = n.Append(dstDatBackend, v.Version()); err != nil {
@@ -396,5 +394,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
return nil
})
+ nm.SaveToIdx(idxName)
+
return
}