aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/storage/needle_read_write.go2
-rw-r--r--weed/storage/volume_vacuum.go130
-rw-r--r--weed/storage/volume_vacuum_test.go64
3 files changed, 107 insertions, 89 deletions
diff --git a/weed/storage/needle_read_write.go b/weed/storage/needle_read_write.go
index ee7cc6046..4241f0758 100644
--- a/weed/storage/needle_read_write.go
+++ b/weed/storage/needle_read_write.go
@@ -162,7 +162,7 @@ func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version
}
n.ParseNeedleHeader(bytes)
if n.Size != size {
- return fmt.Errorf("File Entry Not Found. Needle %d Memory %d", n.Size, size)
+ return fmt.Errorf("File Entry Not Found. Needle id %d expected size %d Memory %d", n.Id, n.Size, size)
}
switch version {
case Version1:
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index a9fe6c03d..9171cadfb 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -119,7 +119,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
oldDatCompactRevision, err := fetchCompactRevisionFromDatFile(oldDatFile)
if err != nil {
- return
+ return fmt.Errorf("fetchCompactRevisionFromDatFile src %s failed: %v", oldDatFile.Name(), err)
}
if oldDatCompactRevision != v.lastCompactRevision {
return fmt.Errorf("current old dat file's compact revision %d is not the expected one %d", oldDatCompactRevision, v.lastCompactRevision)
@@ -137,6 +137,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
return fmt.Errorf("readIndexEntry %s at offset %d failed: %v", oldIdxFileName, idx_offset, err)
}
key, offset, size := idxFileEntry(IdxEntry)
+ glog.V(0).Infof("key %d offset %d size %d", key, offset, size)
if _, found := incrementedHasUpdatedIndexEntry[key]; !found {
incrementedHasUpdatedIndexEntry[key] = keyField{
offset: offset,
@@ -145,77 +146,82 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
}
}
- if len(incrementedHasUpdatedIndexEntry) > 0 {
- var (
- dst, idx *os.File
- )
- if dst, err = os.OpenFile(newDatFileName, os.O_RDWR, 0644); err != nil {
- return
- }
- defer dst.Close()
+ // no updates during commit step
+ if len(incrementedHasUpdatedIndexEntry) == 0 {
+ return nil
+ }
- if idx, err = os.OpenFile(newIdxFileName, os.O_RDWR, 0644); err != nil {
- return
- }
- defer idx.Close()
+ // deal with updates during commit step
+ var (
+ dst, idx *os.File
+ )
+ if dst, err = os.OpenFile(newDatFileName, os.O_RDWR, 0644); err != nil {
+ return fmt.Errorf("open dat file %s failed: %v", newDatFileName, err)
+ }
+ defer dst.Close()
- var newDatCompactRevision uint16
- newDatCompactRevision, err = fetchCompactRevisionFromDatFile(dst)
- if err != nil {
- return
- }
- if oldDatCompactRevision+1 != newDatCompactRevision {
- return fmt.Errorf("oldDatFile %s 's compact revision is %d while newDatFile %s 's compact revision is %d", oldDatFileName, oldDatCompactRevision, newDatFileName, newDatCompactRevision)
- }
+ if idx, err = os.OpenFile(newIdxFileName, os.O_RDWR, 0644); err != nil {
+ return fmt.Errorf("open idx file %s failed: %v", newIdxFileName, err)
+ }
+ defer idx.Close()
+
+ var newDatCompactRevision uint16
+ newDatCompactRevision, err = fetchCompactRevisionFromDatFile(dst)
+ if err != nil {
+ return fmt.Errorf("fetchCompactRevisionFromDatFile dst %s failed: %v", dst.Name(), err)
+ }
+ if oldDatCompactRevision+1 != newDatCompactRevision {
+ return fmt.Errorf("oldDatFile %s 's compact revision is %d while newDatFile %s 's compact revision is %d", oldDatFileName, oldDatCompactRevision, newDatFileName, newDatCompactRevision)
+ }
- idx_entry_bytes := make([]byte, 16)
- for key, incre_idx_entry := range incrementedHasUpdatedIndexEntry {
- util.Uint64toBytes(idx_entry_bytes[0:8], key)
- util.Uint32toBytes(idx_entry_bytes[8:12], incre_idx_entry.offset)
- util.Uint32toBytes(idx_entry_bytes[12:16], incre_idx_entry.size)
+ idx_entry_bytes := make([]byte, 16)
+ for key, incre_idx_entry := range incrementedHasUpdatedIndexEntry {
+ util.Uint64toBytes(idx_entry_bytes[0:8], key)
+ util.Uint32toBytes(idx_entry_bytes[8:12], incre_idx_entry.offset)
+ util.Uint32toBytes(idx_entry_bytes[12:16], incre_idx_entry.size)
- var offset int64
- if offset, err = dst.Seek(0, 2); err != nil {
- glog.V(0).Infof("failed to seek the end of file: %v", err)
+ var offset int64
+ if offset, err = dst.Seek(0, 2); err != nil {
+ glog.V(0).Infof("failed to seek the end of file: %v", err)
+ return
+ }
+ //ensure file writing starting from aligned positions
+ if offset%NeedlePaddingSize != 0 {
+ offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
+ if offset, err = v.dataFile.Seek(offset, 0); err != nil {
+ glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
return
}
- //ensure file writing starting from aligned positions
- if offset%NeedlePaddingSize != 0 {
- offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
- if offset, err = v.dataFile.Seek(offset, 0); err != nil {
- glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
- return
- }
- }
+ }
- //updated needle
- if incre_idx_entry.offset != 0 && incre_idx_entry.size != 0 {
- //even the needle cache in memory is hit, the need_bytes is correct
- var needle_bytes []byte
- needle_bytes, err = ReadNeedleBlob(oldDatFile, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size)
- if err != nil {
- return
- }
- dst.Write(needle_bytes)
- util.Uint32toBytes(idx_entry_bytes[8:12], uint32(offset/NeedlePaddingSize))
- } else { //deleted needle
- //fakeDelNeedle 's default Data field is nil
- fakeDelNeedle := new(Needle)
- fakeDelNeedle.Id = key
- fakeDelNeedle.Cookie = 0x12345678
- _, _, err = fakeDelNeedle.Append(dst, v.Version())
- if err != nil {
- return
- }
- util.Uint32toBytes(idx_entry_bytes[8:12], uint32(0))
+ //updated needle
+ if incre_idx_entry.offset != 0 && incre_idx_entry.size != 0 && incre_idx_entry.size != TombstoneFileSize {
+ //even the needle cache in memory is hit, the need_bytes is correct
+ glog.V(0).Infof("file %d offset %d size %d", key, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size)
+ var needle_bytes []byte
+ needle_bytes, err = ReadNeedleBlob(oldDatFile, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size)
+ if err != nil {
+ return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size, err)
}
-
- if _, err := idx.Seek(0, 2); err != nil {
- return fmt.Errorf("cannot seek end of indexfile %s: %v",
- newIdxFileName, err)
+ dst.Write(needle_bytes)
+ util.Uint32toBytes(idx_entry_bytes[8:12], uint32(offset/NeedlePaddingSize))
+ } else { //deleted needle
+ //fakeDelNeedle 's default Data field is nil
+ fakeDelNeedle := new(Needle)
+ fakeDelNeedle.Id = key
+ fakeDelNeedle.Cookie = 0x12345678
+ _, _, err = fakeDelNeedle.Append(dst, v.Version())
+ if err != nil {
+ return fmt.Errorf("append deleted %d failed: %v", key, err)
}
- _, err = idx.Write(idx_entry_bytes)
+ util.Uint32toBytes(idx_entry_bytes[8:12], uint32(0))
+ }
+
+ if _, err := idx.Seek(0, 2); err != nil {
+ return fmt.Errorf("cannot seek end of indexfile %s: %v",
+ newIdxFileName, err)
}
+ _, err = idx.Write(idx_entry_bytes)
}
return nil
diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go
index c685102f2..428be14b7 100644
--- a/weed/storage/volume_vacuum_test.go
+++ b/weed/storage/volume_vacuum_test.go
@@ -69,36 +69,21 @@ func TestCompaction(t *testing.T) {
t.Fatalf("volume creation: %v", err)
}
- FILE_COUNT := 234
+ beforeCommitFileCount := 10
+ afterCommitFileCount := 10
- infos := make([]*needleInfo, FILE_COUNT)
+ infos := make([]*needleInfo, beforeCommitFileCount+afterCommitFileCount)
- for i := 1; i <= FILE_COUNT; i++ {
- n := newRandomNeedle(uint64(i))
- size, err := v.writeNeedle(n)
- if err != nil {
- t.Fatalf("write file %d: %v", i, err)
- }
- infos[i-1] = &needleInfo{
- size: size,
- crc: n.Checksum,
- }
-
- println("written file", i, "checksum", n.Checksum.Value(), "size", size)
-
- if rand.Float64() < 0.5 {
- toBeDeleted := rand.Intn(i) + 1
- oldNeedle := newEmptyNeedle(uint64(toBeDeleted))
- v.deleteNeedle(oldNeedle)
- println("deleted file", toBeDeleted)
- infos[toBeDeleted-1] = &needleInfo{
- size: 0,
- crc: n.Checksum,
- }
- }
+ for i := 1; i <= beforeCommitFileCount; i++ {
+ doSomeWritesDeletes(i, v, t, infos)
}
v.Compact(0)
+
+ for i := 1; i <= afterCommitFileCount; i++ {
+ doSomeWritesDeletes(i+beforeCommitFileCount, v, t, infos)
+ }
+
v.commitCompact()
v.Close()
@@ -108,7 +93,12 @@ func TestCompaction(t *testing.T) {
t.Fatalf("volume reloading: %v", err)
}
- for i := 1; i <= FILE_COUNT; i++ {
+ for i := 1; i <= beforeCommitFileCount+afterCommitFileCount; i++ {
+
+ if infos[i-1] == nil {
+ t.Fatal("not found file", i)
+ continue
+ }
if infos[i-1].size == 0 {
continue
@@ -129,6 +119,28 @@ func TestCompaction(t *testing.T) {
}
}
+func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) {
+ n := newRandomNeedle(uint64(i))
+ size, err := v.writeNeedle(n)
+ if err != nil {
+ t.Fatalf("write file %d: %v", i, err)
+ }
+ infos[i-1] = &needleInfo{
+ size: size,
+ crc: n.Checksum,
+ }
+ println("written file", i, "checksum", n.Checksum.Value(), "size", size)
+ if rand.Float64() < 0.5 {
+ toBeDeleted := rand.Intn(i) + 1
+ oldNeedle := newEmptyNeedle(uint64(toBeDeleted))
+ v.deleteNeedle(oldNeedle)
+ println("deleted file", toBeDeleted)
+ infos[toBeDeleted-1] = &needleInfo{
+ size: 0,
+ crc: n.Checksum,
+ }
+ }
+}
type needleInfo struct {
size uint32