aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/storage/volume.go9
-rw-r--r--weed/storage/volume_checking.go1
-rw-r--r--weed/storage/volume_vacuum.go100
-rw-r--r--weed/storage/volume_vacuum_test.go53
4 files changed, 132 insertions, 31 deletions
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index 258787701..c1d531376 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -10,11 +10,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
)
-type keyField struct {
- offset uint32
- size uint32
-}
-
type Volume struct {
Id VolumeId
dir string
@@ -29,8 +24,8 @@ type Volume struct {
dataFileAccessLock sync.Mutex
lastModifiedTime uint64 //unix time in seconds
- lastCompactingIndexOffset uint64
- incrementedHasUpdatedIndexEntry map[uint64]keyField
+ lastCompactIndexOffset uint64
+ lastCompactRevision uint16
}
func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go
index d424010f1..48f707594 100644
--- a/weed/storage/volume_checking.go
+++ b/weed/storage/volume_checking.go
@@ -21,7 +21,6 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) error {
return fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e)
}
key, offset, size := idxFileEntry(lastIdxEntry)
- //deleted index entry could not point to deleted needle
if offset == 0 {
return nil
}
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index 55c248894..723300557 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -21,8 +21,9 @@ func (v *Volume) Compact() error {
//glog.V(3).Infof("Got Compaction lock...")
filePath := v.FileName()
- v.lastCompactingIndexOffset = v.nm.IndexFileSize()
- glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactingIndexOffset)
+ v.lastCompactIndexOffset = v.nm.IndexFileSize()
+ v.lastCompactRevision = v.SuperBlock.CompactRevision
+ glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset)
return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx")
}
@@ -71,6 +72,21 @@ func (v *Volume) commitCompact() error {
return nil
}
+func fetchCompactRevisionFromDatFile(file *os.File) (compactRevision uint16, err error) {
+ if _, err = file.Seek(0, 0); err != nil {
+ return 0, fmt.Errorf("cannot seek to the beginning of %s: %v", file.Name(), err)
+ }
+ header := make([]byte, SuperBlockSize)
+ if _, e := file.Read(header); e != nil {
+ return 0, fmt.Errorf("cannot read file %s 's super block: %v", file.Name(), e)
+ }
+ superBlock, err := ParseSuperBlock(header)
+ if err != nil {
+ return 0, err
+ }
+ return superBlock.CompactRevision, nil
+}
+
func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldIdxFileName string) (err error) {
var indexSize int64
@@ -83,56 +99,67 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
if indexSize, err = verifyIndexFileIntegrity(oldIdxFile); err != nil {
return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err)
}
- if indexSize == 0 || uint64(indexSize) <= v.lastCompactingIndexOffset {
+ if indexSize == 0 || uint64(indexSize) <= v.lastCompactIndexOffset {
return nil
}
- v.incrementedHasUpdatedIndexEntry = make(map[uint64]keyField)
- for idx_offset := indexSize; uint64(idx_offset) >= v.lastCompactingIndexOffset; idx_offset -= NeedleIndexSize {
+ oldDatCompactRevision, err := fetchCompactRevisionFromDatFile(oldDatFile)
+ if err != nil {
+ return
+ }
+ if oldDatCompactRevision != v.lastCompactRevision {
+ return fmt.Errorf("current old dat file's compact revision %d is not the expected one %d", oldDatCompactRevision, v.lastCompactRevision)
+ }
+
+ type keyField struct {
+ offset uint32
+ size uint32
+ }
+ incrementedHasUpdatedIndexEntry := make(map[uint64]keyField)
+
+ for idx_offset := indexSize - NeedleIndexSize; uint64(idx_offset) >= v.lastCompactIndexOffset; idx_offset -= NeedleIndexSize {
var IdxEntry []byte
if IdxEntry, err = readIndexEntryAtOffset(oldIdxFile, idx_offset); err != nil {
return fmt.Errorf("readIndexEntry %s at offset %d failed: %v", oldIdxFileName, idx_offset, err)
}
key, offset, size := idxFileEntry(IdxEntry)
- if _, found := v.incrementedHasUpdatedIndexEntry[key]; !found {
- v.incrementedHasUpdatedIndexEntry[key] = keyField{
+ if _, found := incrementedHasUpdatedIndexEntry[key]; !found {
+ incrementedHasUpdatedIndexEntry[key] = keyField{
offset: offset,
size: size,
}
- } else {
- continue
}
}
- if len(v.incrementedHasUpdatedIndexEntry) > 0 {
+ if len(incrementedHasUpdatedIndexEntry) > 0 {
var (
dst, idx *os.File
)
- if dst, err = os.OpenFile(newDatFileName, os.O_WRONLY, 0644); err != nil {
+ if dst, err = os.OpenFile(newDatFileName, os.O_RDWR, 0644); err != nil {
return
}
defer dst.Close()
- if idx, err = os.OpenFile(newIdxFileName, os.O_WRONLY, 0644); err != nil {
+ if idx, err = os.OpenFile(newIdxFileName, os.O_RDWR, 0644); err != nil {
return
}
defer idx.Close()
+ var newDatCompactRevision uint16
+ newDatCompactRevision, err = fetchCompactRevisionFromDatFile(dst)
+ if err != nil {
+ return
+ }
+ if oldDatCompactRevision+1 != newDatCompactRevision {
+ return fmt.Errorf("oldDatFile %s 's compact revision is %d while newDatFile %s 's compact revision is %d", oldDatFileName, oldDatCompactRevision, newDatFileName, newDatCompactRevision)
+ }
+
idx_entry_bytes := make([]byte, 16)
- for key, incre_idx_entry := range v.incrementedHasUpdatedIndexEntry {
+ for key, incre_idx_entry := range incrementedHasUpdatedIndexEntry {
util.Uint64toBytes(idx_entry_bytes[0:8], key)
util.Uint32toBytes(idx_entry_bytes[8:12], incre_idx_entry.offset)
util.Uint32toBytes(idx_entry_bytes[12:16], incre_idx_entry.size)
- if _, err := idx.Seek(0, 2); err != nil {
- return fmt.Errorf("cannot seek end of indexfile %s: %v",
- newIdxFileName, err)
- }
- _, err = idx.Write(idx_entry_bytes)
-
- //even the needle cache in memory is hit, the need_bytes is correct
- needle_bytes, _, _ := ReadNeedleBlob(dst, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size)
-
var offset int64
if offset, err = dst.Seek(0, 2); err != nil {
glog.V(0).Infof("failed to seek the end of file: %v", err)
@@ -146,7 +173,34 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
return
}
}
- dst.Write(needle_bytes)
+
+ //updated needle
+ if incre_idx_entry.offset != 0 && incre_idx_entry.size != 0 {
+ //even the needle cache in memory is hit, the need_bytes is correct
+ var needle_bytes []byte
+ needle_bytes, _, err = ReadNeedleBlob(oldDatFile, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size)
+ if err != nil {
+ return
+ }
+ dst.Write(needle_bytes)
+ util.Uint32toBytes(idx_entry_bytes[8:12], uint32(offset/NeedlePaddingSize))
+ } else { //deleted needle
+ //fakeDelNeedle 's default Data field is nil
+ fakeDelNeedle := new(Needle)
+ fakeDelNeedle.Id = key
+ fakeDelNeedle.Cookie = 0x12345678
+ _, err = fakeDelNeedle.Append(dst, v.Version())
+ if err != nil {
+ return
+ }
+ util.Uint32toBytes(idx_entry_bytes[8:12], uint32(0))
+ }
+
+ if _, err := idx.Seek(0, 2); err != nil {
+ return fmt.Errorf("cannot seek end of indexfile %s: %v",
+ newIdxFileName, err)
+ }
+ _, err = idx.Write(idx_entry_bytes)
}
}
diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go
new file mode 100644
index 000000000..02d1a2b56
--- /dev/null
+++ b/weed/storage/volume_vacuum_test.go
@@ -0,0 +1,53 @@
+package storage
+
+import (
+ "testing"
+)
+
+/*
+makediff test steps
+1. launch weed server at your local/dev environment, (option
+"garbageThreshold" for master and option "max" for volume should be set with specific value which would let
+preparing test prerequisite easier )
+ a) ./weed master -garbageThreshold=0.99 -mdir=./m
+ b) ./weed volume -dir=./data -max=1 -mserver=localhost:9333 -port=8080
+2. upload 4 different files, you could call dir/assign to get 4 different fids
+ a) upload file A with fid a
+ b) upload file B with fid b
+ c) upload file C with fid c
+ d) upload file D with fid d
+3. update file A and C
+ a) modify file A and upload file A with fid a
+ b) modify file C and upload file C with fid c
+ c) record the current 1.idx's file size(lastCompactIndexOffset value)
+4. Compacting the data file
+ a) run curl http://localhost:8080/admin/vacuum/compact?volumeId=1
+ b) verify the 1.cpd and 1.cpx is created under volume directory
+5. update file B and delete file D
+ a) modify file B and upload file B with fid b
+ d) delete file B with fid b
+6. Now you could run the following UT case, the case should be run successfully
+7. Compact commit manually
+ a) mv 1.cpd 1.dat
+ b) mv 1.cpx 1.idx
+8. Restart Volume Server
+9. Now you should get updated file A,B,C
+*/
+
+func TestMakeDiff(t *testing.T) {
+
+ v := new(Volume)
+ //lastCompactIndexOffset value is the index file size before step 4
+ v.lastCompactIndexOffset = 96
+ v.SuperBlock.version = 0x2
+ err := v.makeupDiff(
+ "/yourpath/1.cpd",
+ "/yourpath/1.cpx",
+ "/yourpath/1.dat",
+ "/yourpath/1.idx")
+ if err != nil {
+ t.Errorf("makeupDiff err is %v", err)
+ } else {
+ t.Log("makeupDiff Succeeded")
+ }
+}