diff options
Diffstat (limited to 'weed/storage/volume_checking.go')
| -rw-r--r-- | weed/storage/volume_checking.go | 65 |
1 files changed, 54 insertions, 11 deletions
diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go index e42fb238b..78c693f5c 100644 --- a/weed/storage/volume_checking.go +++ b/weed/storage/volume_checking.go @@ -4,6 +4,7 @@ import ( "fmt" "os" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/idx" "github.com/chrislusf/seaweedfs/weed/storage/needle" @@ -11,17 +12,28 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, e error) { +func CheckAndFixVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, err error) { var indexSize int64 - if indexSize, e = verifyIndexFileIntegrity(indexFile); e != nil { - return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), e) + if indexSize, err = verifyIndexFileIntegrity(indexFile); err != nil { + return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), err) } if indexSize == 0 { return 0, nil } + for i := 1; i <= 10; i++ { + // check and fix last 10 entries + lastAppendAtNs, err = doCheckAndFixVolumeData(v, indexFile, indexSize-int64(i)*NeedleMapEntrySize) + if err != ErrorSizeMismatch { + break + } + } + return +} + +func doCheckAndFixVolumeData(v *Volume, indexFile *os.File, indexOffset int64) (lastAppendAtNs uint64, err error) { var lastIdxEntry []byte - if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleMapEntrySize); e != nil { - return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e) + if lastIdxEntry, err = readIndexEntryAtOffset(indexFile, indexOffset); err != nil { + return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), err) } key, offset, size := idx.IdxFileEntry(lastIdxEntry) if offset.IsZero() { @@ -29,15 +41,15 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uin } if size < 0 { // read the deletion entry - if lastAppendAtNs, e = verifyDeletedNeedleIntegrity(v.DataBackend, v.Version(), key); e != nil { - return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e) + if lastAppendAtNs, err = verifyDeletedNeedleIntegrity(v.DataBackend, v.Version(), key); err != nil { + return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), err) } } else { - if lastAppendAtNs, e = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); e != nil { - return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e) + if lastAppendAtNs, err = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); err != nil { + return lastAppendAtNs, err } } - return + return lastAppendAtNs, nil } func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) { @@ -60,7 +72,38 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err } func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key NeedleId, size Size) (lastAppendAtNs uint64, err error) { - n := new(needle.Needle) + n, _, _, err := needle.ReadNeedleHeader(datFile, v, offset) + if err != nil { + return 0, fmt.Errorf("read %s at %d", datFile.Name(), offset) + } + if n.Size != size { + return 0, ErrorSizeMismatch + } + if v == needle.Version3 { + bytes := make([]byte, TimestampSize) + _, err = datFile.ReadAt(bytes, offset+NeedleHeaderSize+int64(size)+needle.NeedleChecksumSize) + if err != nil { + return 0, fmt.Errorf("check %s entry offset %d size %d: %v", datFile.Name(), offset, size, err) + } + n.AppendAtNs = util.BytesToUint64(bytes) + fileTailOffset := offset + needle.GetActualSize(size, v) + fileSize, _, err := datFile.GetStat() + if err != nil { + return 0, fmt.Errorf("stat file %s: %v", datFile.Name(), err) + } + if fileSize == fileTailOffset { + return n.AppendAtNs, nil + } + if fileSize > fileTailOffset { + glog.Warningf("Truncate %s from %d bytes to %d bytes!", datFile.Name(), fileSize, fileTailOffset) + err = datFile.Truncate(fileTailOffset) + if err == nil { + return n.AppendAtNs, nil + } + return n.AppendAtNs, fmt.Errorf("truncate file %s: %v", datFile.Name(), err) + } + glog.Warningf("data file %s has %d bytes, less than expected %d bytes!", datFile.Name(), fileSize, fileTailOffset) + } if err = n.ReadData(datFile, offset, size, v); err != nil { return n.AppendAtNs, fmt.Errorf("read data [%d,%d) : %v", offset, offset+int64(size), err) } |
