aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-04-19 01:56:38 -0700
committerChris Lu <chris.lu@gmail.com>2019-04-19 01:56:38 -0700
commit0be2d51c96e4c2c05cad5ac3d6104ccb7e1adeb6 (patch)
tree6b5f3ed7ceb32941db2e7999a162a07da8fc543c
parentac2727853f94c8fc4e301a995997f02b70047853 (diff)
downloadseaweedfs-0be2d51c96e4c2c05cad5ac3d6104ccb7e1adeb6.tar.xz
seaweedfs-0be2d51c96e4c2c05cad5ac3d6104ccb7e1adeb6.zip
read volume lastAppendAtNs when loading a volume
-rw-r--r--weed/storage/disk_location.go2
-rw-r--r--weed/storage/needle/needle_read_write.go14
-rw-r--r--weed/storage/volume_checking.go34
-rw-r--r--weed/storage/volume_loading.go2
4 files changed, 28 insertions, 24 deletions
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index 6bfb1ff76..a4a3c519e 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -57,9 +57,11 @@ func (l *DiskLocation) loadExistingVolume(dir os.FileInfo, needleMapKind NeedleM
size, _, _ := v.FileStat()
glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s",
l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
+ // println("volume", vid, "last append at", v.lastAppendAtNs)
} else {
glog.V(0).Infof("new volume %s error %s", name, e)
}
+
}
}
}
diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go
index d1df7d4e2..e1ad075b4 100644
--- a/weed/storage/needle/needle_read_write.go
+++ b/weed/storage/needle/needle_read_write.go
@@ -185,15 +185,17 @@ func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version
case Version2, Version3:
err = n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(n.Size)])
}
- if size == 0 || err != nil {
+ if err != nil && err != io.EOF{
return err
}
- checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize])
- newChecksum := NewCRC(n.Data)
- if checksum != newChecksum.Value() {
- return errors.New("CRC error! Data On Disk Corrupted")
+ if size > 0 {
+ checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize])
+ newChecksum := NewCRC(n.Data)
+ if checksum != newChecksum.Value() {
+ return errors.New("CRC error! Data On Disk Corrupted")
+ }
+ n.Checksum = newChecksum
}
- n.Checksum = newChecksum
if version == Version3 {
tsOffset := NeedleHeaderSize + size + NeedleChecksumSize
n.AppendAtNs = util.BytesToUint64(bytes[tsOffset : tsOffset+TimestampSize])
diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go
index 9599885ec..980656823 100644
--- a/weed/storage/volume_checking.go
+++ b/weed/storage/volume_checking.go
@@ -9,28 +9,29 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) error {
+func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, e error) {
var indexSize int64
- var e error
if indexSize, e = verifyIndexFileIntegrity(indexFile); e != nil {
- return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), e)
+ return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), e)
}
if indexSize == 0 {
- return nil
+ return 0,nil
}
var lastIdxEntry []byte
if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleMapEntrySize); e != nil {
- return fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e)
+ return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e)
}
key, offset, size := IdxFileEntry(lastIdxEntry)
- if offset.IsZero() || size == TombstoneFileSize {
- return nil
+ if offset.IsZero() {
+ return 0,nil
}
- if e = verifyNeedleIntegrity(v.dataFile, v.Version(), offset.ToAcutalOffset(), key, size); e != nil {
- return fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e)
+ if size == TombstoneFileSize {
+ size = 0
}
-
- return nil
+ if lastAppendAtNs, e = verifyNeedleIntegrity(v.dataFile, v.Version(), offset.ToAcutalOffset(), key, size); e != nil {
+ return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e)
+ }
+ return
}
func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) {
@@ -52,14 +53,13 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err
return
}
-func verifyNeedleIntegrity(datFile *os.File, v needle.Version, offset int64, key NeedleId, size uint32) error {
+func verifyNeedleIntegrity(datFile *os.File, v needle.Version, offset int64, key NeedleId, size uint32) (lastAppendAtNs uint64, err error) {
n := new(needle.Needle)
- err := n.ReadData(datFile, offset, size, v)
- if err != nil {
- return err
+ if err = n.ReadData(datFile, offset, size, v); err != nil {
+ return n.AppendAtNs, err
}
if n.Id != key {
- return fmt.Errorf("index key %#x does not match needle's Id %#x", key, n.Id)
+ return n.AppendAtNs, fmt.Errorf("index key %#x does not match needle's Id %#x", key, n.Id)
}
- return nil
+ return n.AppendAtNs, err
}
diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go
index eb82ffe98..cfc59eb09 100644
--- a/weed/storage/volume_loading.go
+++ b/weed/storage/volume_loading.go
@@ -73,7 +73,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
}
}
- if e = CheckVolumeDataIntegrity(v, indexFile); e != nil {
+ if v.lastAppendAtNs, e = CheckVolumeDataIntegrity(v, indexFile); e != nil {
v.readOnly = true
glog.V(0).Infof("volumeDataIntegrityChecking failed %v", e)
}