aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/storage/volume_read.go9
1 files changed, 8 insertions, 1 deletions
diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go
index 34b75660e..e18ce9016 100644
--- a/weed/storage/volume_read.go
+++ b/weed/storage/volume_read.go
@@ -164,6 +164,13 @@ func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, wr
toWrite := min(count, int(offset+size-x))
if toWrite > 0 {
crc = crc.Update(buf[0:toWrite])
+ if offset == 0 && size == int64(n.DataSize) && int64(count) == size && (n.Checksum != crc) {
+ // This check works only if the buffer is big enough to hold the whole needle data
+ // and we ask for all needle data.
+ // Otherwise we cannot check the validity of partially aquired data.
+ stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorCRC).Inc()
+ return fmt.Errorf("ReadNeedleData checksum %v expected %v for Needle: %v,%v", crc, n.Checksum, v.Id, n)
+ }
if _, err = writer.Write(buf[0:toWrite]); err != nil {
return fmt.Errorf("ReadNeedleData write: %v", err)
}
@@ -182,7 +189,7 @@ func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, wr
if offset == 0 && size == int64(n.DataSize) && (n.Checksum != crc && uint32(n.Checksum) != crc.Value()) {
// the crc.Value() function is to be deprecated. this double checking is for backward compatible.
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorCRC).Inc()
- return fmt.Errorf("ReadNeedleData checksum %v expected %v", crc, n.Checksum)
+ return fmt.Errorf("ReadNeedleData checksum %v expected %v for Needle: %v,%v", crc, n.Checksum, v.Id, n)
}
return nil