aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author霍晓栋 <huoxd@jiedaibao.com>2016-09-29 13:57:23 +0800
committer霍晓栋 <huoxd@jiedaibao.com>2016-09-29 13:57:23 +0800
commited848425c77ccca0a9d2f30c7f631bc50f28cd32 (patch)
treed61f53245889899a92aaf3b0255e4ff71551d3c1
parentdffad65f2f3b1e87c7ac5274065730cabceeee99 (diff)
downloadseaweedfs-ed848425c77ccca0a9d2f30c7f631bc50f28cd32.tar.xz
seaweedfs-ed848425c77ccca0a9d2f30c7f631bc50f28cd32.zip
supplemental data between compacting and commit compacting
-rw-r--r--weed/storage/volume.go8
-rw-r--r--weed/storage/volume_vacuum.go110
2 files changed, 110 insertions, 8 deletions
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index 801dfe267..258787701 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -10,6 +10,11 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
)
+type keyField struct {
+ offset uint32
+ size uint32
+}
+
type Volume struct {
Id VolumeId
dir string
@@ -23,6 +28,9 @@ type Volume struct {
dataFileAccessLock sync.Mutex
lastModifiedTime uint64 //unix time in seconds
+
+ lastCompactingIndexOffset uint64
+ incrementedHasUpdatedIndexEntry map[uint64]keyField
}
func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index 51d74e311..55c248894 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -6,6 +6,7 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func (v *Volume) garbageLevel() float64 {
@@ -20,7 +21,8 @@ func (v *Volume) Compact() error {
//glog.V(3).Infof("Got Compaction lock...")
filePath := v.FileName()
- glog.V(3).Infof("creating copies for volume %d ...", v.Id)
+ v.lastCompactingIndexOffset = v.nm.IndexFileSize()
+ glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactingIndexOffset)
return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx")
}
@@ -38,14 +40,28 @@ func (v *Volume) commitCompact() error {
glog.V(3).Infof("Got Committing lock...")
v.nm.Close()
_ = v.dataFile.Close()
- makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx")
+
var e error
- if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil {
- return e
- }
- if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil {
- return e
+ if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil {
+ glog.V(0).Infof("makeupDiff in commitCompact failed %v", e)
+ e = os.Remove(v.FileName() + ".cpd")
+ if e != nil {
+ return e
+ }
+ e = os.Remove(v.FileName() + ".cpx")
+ if e != nil {
+ return e
+ }
+ } else {
+ var e error
+ if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil {
+ return e
+ }
+ if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil {
+ return e
+ }
}
+
//glog.V(3).Infof("Pretending to be vacuuming...")
//time.Sleep(20 * time.Second)
glog.V(3).Infof("Loading Commit file...")
@@ -55,7 +71,85 @@ func (v *Volume) commitCompact() error {
return nil
}
-func makeupDiff(newDatFile, newIdxFile, oldDatFile, oldIdxFile string) (err error) {
+func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldIdxFileName string) (err error) {
+ var indexSize int64
+
+ oldIdxFile, err := os.Open(oldIdxFileName)
+ defer oldIdxFile.Close()
+
+ oldDatFile, err := os.Open(oldDatFileName)
+ defer oldDatFile.Close()
+
+ if indexSize, err = verifyIndexFileIntegrity(oldIdxFile); err != nil {
+ return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err)
+ }
+ if indexSize == 0 || uint64(indexSize) <= v.lastCompactingIndexOffset {
+ return nil
+ }
+
+ v.incrementedHasUpdatedIndexEntry = make(map[uint64]keyField)
+ for idx_offset := indexSize; uint64(idx_offset) >= v.lastCompactingIndexOffset; idx_offset -= NeedleIndexSize {
+ var IdxEntry []byte
+ if IdxEntry, err = readIndexEntryAtOffset(oldIdxFile, idx_offset); err != nil {
+ return fmt.Errorf("readIndexEntry %s at offset %d failed: %v", oldIdxFileName, idx_offset, err)
+ }
+ key, offset, size := idxFileEntry(IdxEntry)
+ if _, found := v.incrementedHasUpdatedIndexEntry[key]; !found {
+ v.incrementedHasUpdatedIndexEntry[key] = keyField{
+ offset: offset,
+ size: size,
+ }
+ } else {
+ continue
+ }
+ }
+
+ if len(v.incrementedHasUpdatedIndexEntry) > 0 {
+ var (
+ dst, idx *os.File
+ )
+ if dst, err = os.OpenFile(newDatFileName, os.O_WRONLY, 0644); err != nil {
+ return
+ }
+ defer dst.Close()
+
+ if idx, err = os.OpenFile(newIdxFileName, os.O_WRONLY, 0644); err != nil {
+ return
+ }
+ defer idx.Close()
+
+ idx_entry_bytes := make([]byte, 16)
+ for key, incre_idx_entry := range v.incrementedHasUpdatedIndexEntry {
+ util.Uint64toBytes(idx_entry_bytes[0:8], key)
+ util.Uint32toBytes(idx_entry_bytes[8:12], incre_idx_entry.offset)
+ util.Uint32toBytes(idx_entry_bytes[12:16], incre_idx_entry.size)
+
+ if _, err := idx.Seek(0, 2); err != nil {
+ return fmt.Errorf("cannot seek end of indexfile %s: %v",
+ newIdxFileName, err)
+ }
+ _, err = idx.Write(idx_entry_bytes)
+
+ //even the needle cache in memory is hit, the need_bytes is correct
+ needle_bytes, _, _ := ReadNeedleBlob(dst, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size)
+
+ var offset int64
+ if offset, err = dst.Seek(0, 2); err != nil {
+ glog.V(0).Infof("failed to seek the end of file: %v", err)
+ return
+ }
+ //ensure file writing starting from aligned positions
+ if offset%NeedlePaddingSize != 0 {
+ offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
+ if offset, err = v.dataFile.Seek(offset, 0); err != nil {
+ glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
+ return
+ }
+ }
+ dst.Write(needle_bytes)
+ }
+ }
+
return nil
}