aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-01-08 09:45:03 -0800
committerChris Lu <chris.lu@gmail.com>2020-01-08 09:45:03 -0800
commitacf7ca7b93e0a33e9fc987dec600d1b9f1dc1e32 (patch)
tree7f55e4c8baa41098df36e676c048d3c4588959cb
parent943f4986ef27fda2487eff0669ecf79faaaacda1 (diff)
downloadseaweedfs-acf7ca7b93e0a33e9fc987dec600d1b9f1dc1e32.tar.xz
seaweedfs-acf7ca7b93e0a33e9fc987dec600d1b9f1dc1e32.zip
volume: fix compaction
-rw-r--r--weed/storage/needle_map/memdb.go5
-rw-r--r--weed/storage/volume_vacuum.go38
2 files changed, 20 insertions, 23 deletions
diff --git a/weed/storage/needle_map/memdb.go b/weed/storage/needle_map/memdb.go
index 6aba6adeb..9eb4d9f56 100644
--- a/weed/storage/needle_map/memdb.go
+++ b/weed/storage/needle_map/memdb.go
@@ -89,6 +89,9 @@ func (cm *MemDb) SaveToIdx(idxName string) (ret error) {
defer idxFile.Close()
return cm.AscendingVisit(func(value NeedleValue) error {
+ if value.Offset.IsZero() || value.Size == TombstoneFileSize {
+ return nil
+ }
_, err := idxFile.Write(value.ToBytes())
return err
})
@@ -104,7 +107,7 @@ func (cm *MemDb) LoadFromIdx(idxName string) (ret error) {
return idx.WalkIndexFile(idxFile, func(key NeedleId, offset Offset, size uint32) error {
if offset.IsZero() || size == TombstoneFileSize {
- return nil
+ return cm.Delete(key)
}
return cm.Set(key, offset, size)
})
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index 434b5989d..09bf36f7a 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -356,19 +356,17 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string, preallocate int64) (err error) {
var (
dstDatBackend backend.BackendStorageFile
- oldIndexFile *os.File
)
if dstDatBackend, err = createVolumeFile(dstName, preallocate, 0); err != nil {
return
}
defer dstDatBackend.Close()
- if oldIndexFile, err = os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644); err != nil {
+ oldNm := needle_map.NewMemDb()
+ newNm := needle_map.NewMemDb()
+ if err = oldNm.LoadFromIdx(v.FileName()+".idx"); err != nil {
return
}
- defer oldIndexFile.Close()
-
- nm := needle_map.NewMemDb()
now := uint64(time.Now().Unix())
@@ -376,13 +374,11 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string, preallocate i
dstDatBackend.WriteAt(v.SuperBlock.Bytes(), 0)
newOffset := int64(v.SuperBlock.BlockSize())
- idx2.WalkIndexFile(oldIndexFile, func(key NeedleId, offset Offset, size uint32) error {
- if offset.IsZero() || size == TombstoneFileSize {
- return nil
- }
+ oldNm.AscendingVisit(func(value needle_map.NeedleValue) error {
+
+ offset, size := value.Offset, value.Size
- nv, ok := v.nm.Get(key)
- if !ok {
+ if offset.IsZero() || size == TombstoneFileSize {
return nil
}
@@ -396,21 +392,19 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string, preallocate i
return nil
}
- glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
- if nv.Offset == offset && nv.Size > 0 {
- if err = nm.Set(n.Id, ToOffset(newOffset), n.Size); err != nil {
- return fmt.Errorf("cannot put needle: %s", err)
- }
- if _, _, _, err = n.Append(dstDatBackend, v.Version()); err != nil {
- return fmt.Errorf("cannot append needle: %s", err)
- }
- newOffset += n.DiskSize(v.Version())
- glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", newOffset, "data_size", n.Size)
+ if err = newNm.Set(n.Id, ToOffset(newOffset), n.Size); err != nil {
+ return fmt.Errorf("cannot put needle: %s", err)
}
+ if _, _, _, err = n.Append(dstDatBackend, v.Version()); err != nil {
+ return fmt.Errorf("cannot append needle: %s", err)
+ }
+ newOffset += n.DiskSize(v.Version())
+ glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", newOffset, "data_size", n.Size)
+
return nil
})
- nm.SaveToIdx(idxName)
+ newNm.SaveToIdx(idxName)
return
}