aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2016-10-08 10:12:50 -0700
committerGitHub <noreply@github.com>2016-10-08 10:12:50 -0700
commit79fdea0de4176dc44b3728987c0aa4b33b725503 (patch)
tree6a61f67775f8a739cadd15a7c3d91f752b2b69de
parentdffad65f2f3b1e87c7ac5274065730cabceeee99 (diff)
parent7d73bbb07399cc504ae2ebdcfdc164dc01295916 (diff)
downloadseaweedfs-79fdea0de4176dc44b3728987c0aa4b33b725503.tar.xz
seaweedfs-79fdea0de4176dc44b3728987c0aa4b33b725503.zip
Merge pull request #375 from hxiaodon/master
supplemental data between compacting and commit compacting
-rw-r--r--weed/storage/volume.go3
-rw-r--r--weed/storage/volume_checking.go1
-rw-r--r--weed/storage/volume_vacuum.go164
-rw-r--r--weed/storage/volume_vacuum_test.go55
4 files changed, 214 insertions, 9 deletions
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index 801dfe267..c1d531376 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -23,6 +23,9 @@ type Volume struct {
dataFileAccessLock sync.Mutex
lastModifiedTime uint64 //unix time in seconds
+
+ lastCompactIndexOffset uint64
+ lastCompactRevision uint16
}
func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go
index d424010f1..48f707594 100644
--- a/weed/storage/volume_checking.go
+++ b/weed/storage/volume_checking.go
@@ -21,7 +21,6 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) error {
return fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e)
}
key, offset, size := idxFileEntry(lastIdxEntry)
- //deleted index entry could not point to deleted needle
if offset == 0 {
return nil
}
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index 51d74e311..723300557 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -6,6 +6,7 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func (v *Volume) garbageLevel() float64 {
@@ -20,7 +21,9 @@ func (v *Volume) Compact() error {
//glog.V(3).Infof("Got Compaction lock...")
filePath := v.FileName()
- glog.V(3).Infof("creating copies for volume %d ...", v.Id)
+ v.lastCompactIndexOffset = v.nm.IndexFileSize()
+ v.lastCompactRevision = v.SuperBlock.CompactRevision
+ glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset)
return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx")
}
@@ -38,14 +41,28 @@ func (v *Volume) commitCompact() error {
glog.V(3).Infof("Got Committing lock...")
v.nm.Close()
_ = v.dataFile.Close()
- makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx")
+
var e error
- if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil {
- return e
- }
- if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil {
- return e
+ if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil {
+ glog.V(0).Infof("makeupDiff in commitCompact failed %v", e)
+ e = os.Remove(v.FileName() + ".cpd")
+ if e != nil {
+ return e
+ }
+ e = os.Remove(v.FileName() + ".cpx")
+ if e != nil {
+ return e
+ }
+ } else {
+ var e error
+ if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil {
+ return e
+ }
+ if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil {
+ return e
+ }
}
+
//glog.V(3).Infof("Pretending to be vacuuming...")
//time.Sleep(20 * time.Second)
glog.V(3).Infof("Loading Commit file...")
@@ -55,7 +72,138 @@ func (v *Volume) commitCompact() error {
return nil
}
-func makeupDiff(newDatFile, newIdxFile, oldDatFile, oldIdxFile string) (err error) {
+func fetchCompactRevisionFromDatFile(file *os.File) (compactRevision uint16, err error) {
+ if _, err = file.Seek(0, 0); err != nil {
+ return 0, fmt.Errorf("cannot seek to the beginning of %s: %v", file.Name(), err)
+ }
+ header := make([]byte, SuperBlockSize)
+ if _, e := file.Read(header); e != nil {
+ return 0, fmt.Errorf("cannot read file %s 's super block: %v", file.Name(), e)
+ }
+ superBlock, err := ParseSuperBlock(header)
+ if err != nil {
+ return 0, err
+ }
+ return superBlock.CompactRevision, nil
+}
+
+func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldIdxFileName string) (err error) {
+ var indexSize int64
+
+ oldIdxFile, err := os.Open(oldIdxFileName)
+ defer oldIdxFile.Close()
+
+ oldDatFile, err := os.Open(oldDatFileName)
+ defer oldDatFile.Close()
+
+ if indexSize, err = verifyIndexFileIntegrity(oldIdxFile); err != nil {
+ return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err)
+ }
+ if indexSize == 0 || uint64(indexSize) <= v.lastCompactIndexOffset {
+ return nil
+ }
+
+ oldDatCompactRevision, err := fetchCompactRevisionFromDatFile(oldDatFile)
+ if err != nil {
+ return
+ }
+ if oldDatCompactRevision != v.lastCompactRevision {
+ return fmt.Errorf("current old dat file's compact revision %d is not the expected one %d", oldDatCompactRevision, v.lastCompactRevision)
+ }
+
+ type keyField struct {
+ offset uint32
+ size uint32
+ }
+ incrementedHasUpdatedIndexEntry := make(map[uint64]keyField)
+
+ for idx_offset := indexSize - NeedleIndexSize; uint64(idx_offset) >= v.lastCompactIndexOffset; idx_offset -= NeedleIndexSize {
+ var IdxEntry []byte
+ if IdxEntry, err = readIndexEntryAtOffset(oldIdxFile, idx_offset); err != nil {
+ return fmt.Errorf("readIndexEntry %s at offset %d failed: %v", oldIdxFileName, idx_offset, err)
+ }
+ key, offset, size := idxFileEntry(IdxEntry)
+ if _, found := incrementedHasUpdatedIndexEntry[key]; !found {
+ incrementedHasUpdatedIndexEntry[key] = keyField{
+ offset: offset,
+ size: size,
+ }
+ }
+ }
+
+ if len(incrementedHasUpdatedIndexEntry) > 0 {
+ var (
+ dst, idx *os.File
+ )
+ if dst, err = os.OpenFile(newDatFileName, os.O_RDWR, 0644); err != nil {
+ return
+ }
+ defer dst.Close()
+
+ if idx, err = os.OpenFile(newIdxFileName, os.O_RDWR, 0644); err != nil {
+ return
+ }
+ defer idx.Close()
+
+ var newDatCompactRevision uint16
+ newDatCompactRevision, err = fetchCompactRevisionFromDatFile(dst)
+ if err != nil {
+ return
+ }
+ if oldDatCompactRevision+1 != newDatCompactRevision {
+ return fmt.Errorf("oldDatFile %s 's compact revision is %d while newDatFile %s 's compact revision is %d", oldDatFileName, oldDatCompactRevision, newDatFileName, newDatCompactRevision)
+ }
+
+ idx_entry_bytes := make([]byte, 16)
+ for key, incre_idx_entry := range incrementedHasUpdatedIndexEntry {
+ util.Uint64toBytes(idx_entry_bytes[0:8], key)
+ util.Uint32toBytes(idx_entry_bytes[8:12], incre_idx_entry.offset)
+ util.Uint32toBytes(idx_entry_bytes[12:16], incre_idx_entry.size)
+
+ var offset int64
+ if offset, err = dst.Seek(0, 2); err != nil {
+ glog.V(0).Infof("failed to seek the end of file: %v", err)
+ return
+ }
+ //ensure file writing starting from aligned positions
+ if offset%NeedlePaddingSize != 0 {
+ offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
+ if offset, err = v.dataFile.Seek(offset, 0); err != nil {
+ glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
+ return
+ }
+ }
+
+ //updated needle
+ if incre_idx_entry.offset != 0 && incre_idx_entry.size != 0 {
+ //even the needle cache in memory is hit, the need_bytes is correct
+ var needle_bytes []byte
+ needle_bytes, _, err = ReadNeedleBlob(oldDatFile, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size)
+ if err != nil {
+ return
+ }
+ dst.Write(needle_bytes)
+ util.Uint32toBytes(idx_entry_bytes[8:12], uint32(offset/NeedlePaddingSize))
+ } else { //deleted needle
+ //fakeDelNeedle 's default Data field is nil
+ fakeDelNeedle := new(Needle)
+ fakeDelNeedle.Id = key
+ fakeDelNeedle.Cookie = 0x12345678
+ _, err = fakeDelNeedle.Append(dst, v.Version())
+ if err != nil {
+ return
+ }
+ util.Uint32toBytes(idx_entry_bytes[8:12], uint32(0))
+ }
+
+ if _, err := idx.Seek(0, 2); err != nil {
+ return fmt.Errorf("cannot seek end of indexfile %s: %v",
+ newIdxFileName, err)
+ }
+ _, err = idx.Write(idx_entry_bytes)
+ }
+ }
+
return nil
}
diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go
new file mode 100644
index 000000000..c2fac6ce8
--- /dev/null
+++ b/weed/storage/volume_vacuum_test.go
@@ -0,0 +1,55 @@
+package storage
+
+import (
+ "testing"
+)
+
+/*
+makediff test steps
+1. launch weed server at your local/dev environment, (option
+"garbageThreshold" for master and option "max" for volume should be set with specific value which would let
+preparing test prerequisite easier )
+ a) ./weed master -garbageThreshold=0.99 -mdir=./m
+ b) ./weed volume -dir=./data -max=1 -mserver=localhost:9333 -port=8080
+2. upload 4 different files, you could call dir/assign to get 4 different fids
+ a) upload file A with fid a
+ b) upload file B with fid b
+ c) upload file C with fid c
+ d) upload file D with fid d
+3. update file A and C
+ a) modify file A and upload file A with fid a
+ b) modify file C and upload file C with fid c
+ c) record the current 1.idx's file size(lastCompactIndexOffset value)
+4. Compacting the data file
+ a) run curl http://localhost:8080/admin/vacuum/compact?volumeId=1
+ b) verify the 1.cpd and 1.cpx is created under volume directory
+5. update file B and delete file D
+ a) modify file B and upload file B with fid b
+ d) delete file B with fid b
+6. Now you could run the following UT case, the case should be run successfully
+7. Compact commit manually
+ a) mv 1.cpd 1.dat
+ b) mv 1.cpx 1.idx
+8. Restart Volume Server
+9. Now you should get updated file A,B,C
+*/
+
+func TestMakeDiff(t *testing.T) {
+
+ v := new(Volume)
+ //lastCompactIndexOffset value is the index file size before step 4
+ v.lastCompactIndexOffset = 96
+ v.SuperBlock.version = 0x2
+ /*
+ err := v.makeupDiff(
+ "/yourpath/1.cpd",
+ "/yourpath/1.cpx",
+ "/yourpath/1.dat",
+ "/yourpath/1.idx")
+ if err != nil {
+ t.Errorf("makeupDiff err is %v", err)
+ } else {
+ t.Log("makeupDiff Succeeded")
+ }
+ */
+}