aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mount/page_writer/page_chunk_swapfile.go6
-rw-r--r--weed/storage/backend/disk_file.go7
-rw-r--r--weed/storage/erasure_coding/ec_shard.go7
-rw-r--r--weed/storage/erasure_coding/ec_volume.go6
-rw-r--r--weed/storage/needle/needle_read.go10
-rw-r--r--weed/storage/needle/needle_read_page.go6
-rw-r--r--weed/storage/needle_map_metric.go6
-rw-r--r--weed/storage/store_ec.go8
-rw-r--r--weed/storage/super_block/super_block_read.go.go8
-rw-r--r--weed/storage/volume_checking.go12
-rw-r--r--weed/util/chunk_cache/chunk_cache_on_disk.go18
11 files changed, 73 insertions, 21 deletions
diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go
index 350821757..bdd941117 100644
--- a/weed/mount/page_writer/page_chunk_swapfile.go
+++ b/weed/mount/page_writer/page_chunk_swapfile.go
@@ -4,6 +4,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
+ "io"
"os"
"sync"
)
@@ -130,7 +131,10 @@ func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop in
logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset)
if logicStart < logicStop {
actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize
- if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
+ if n, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
+ if err == io.EOF && n == int(logicStop-logicStart) {
+ err = nil
+ }
glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err)
break
}
diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go
index 68ffbd7e7..070f79865 100644
--- a/weed/storage/backend/disk_file.go
+++ b/weed/storage/backend/disk_file.go
@@ -3,6 +3,7 @@ package backend
import (
"github.com/seaweedfs/seaweedfs/weed/glog"
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
+ "io"
"os"
"runtime"
"time"
@@ -43,7 +44,11 @@ func (df *DiskFile) ReadAt(p []byte, off int64) (n int, err error) {
if df.File == nil {
return 0, os.ErrClosed
}
- return df.File.ReadAt(p, off)
+ n, err = df.File.ReadAt(p, off)
+ if err == io.EOF && n == len(p) {
+ err = nil
+ }
+ return
}
func (df *DiskFile) WriteAt(p []byte, off int64) (n int, err error) {
diff --git a/weed/storage/erasure_coding/ec_shard.go b/weed/storage/erasure_coding/ec_shard.go
index 9fcb11525..0a7f4d09a 100644
--- a/weed/storage/erasure_coding/ec_shard.go
+++ b/weed/storage/erasure_coding/ec_shard.go
@@ -2,6 +2,7 @@ package erasure_coding
import (
"fmt"
+ "io"
"os"
"path"
"strconv"
@@ -93,6 +94,10 @@ func (shard *EcVolumeShard) Destroy() {
func (shard *EcVolumeShard) ReadAt(buf []byte, offset int64) (int, error) {
- return shard.ecdFile.ReadAt(buf, offset)
+ n, err := shard.ecdFile.ReadAt(buf, offset)
+ if err == io.EOF && n == len(buf) {
+ err = nil
+ }
+ return n, err
}
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go
index d3a76b561..385d30abe 100644
--- a/weed/storage/erasure_coding/ec_volume.go
+++ b/weed/storage/erasure_coding/ec_volume.go
@@ -255,8 +255,10 @@ func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId t
l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize
for l < h {
m := (l + h) / 2
- if _, err := ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
- return types.Offset{}, types.TombstoneFileSize, fmt.Errorf("ecx file %d read at %d: %v", ecxFileSize, m*types.NeedleMapEntrySize, err)
+ if n, err := ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
+ if n != types.NeedleMapEntrySize {
+ return types.Offset{}, types.TombstoneFileSize, fmt.Errorf("ecx file %d read at %d: %v", ecxFileSize, m*types.NeedleMapEntrySize, err)
+ }
}
key, offset, size = idx.IdxFileEntry(buf)
if key == needleId {
diff --git a/weed/storage/needle/needle_read.go b/weed/storage/needle/needle_read.go
index f8468e9e2..1907efad3 100644
--- a/weed/storage/needle/needle_read.go
+++ b/weed/storage/needle/needle_read.go
@@ -196,6 +196,9 @@ func ReadNeedleHeader(r backend.BackendStorageFile, version Version, offset int6
var count int
count, err = r.ReadAt(bytes, offset)
+ if err == io.EOF && count == NeedleHeaderSize {
+ err = nil
+ }
if count <= 0 || err != nil {
return nil, bytes, 0, err
}
@@ -230,7 +233,12 @@ func (n *Needle) ReadNeedleBody(r backend.BackendStorageFile, version Version, o
return nil, nil
}
bytes = make([]byte, bodyLength)
- if _, err = r.ReadAt(bytes, offset); err != nil {
+ readCount, err := r.ReadAt(bytes, offset)
+ if err == io.EOF && int64(readCount) == bodyLength {
+ err = nil
+ }
+ if err != nil {
+ glog.Errorf("%s read %d bodyLength %d offset %d: %v", r.Name(), readCount, bodyLength, offset, err)
return
}
diff --git a/weed/storage/needle/needle_read_page.go b/weed/storage/needle/needle_read_page.go
index 36ddc3320..4e1032de8 100644
--- a/weed/storage/needle/needle_read_page.go
+++ b/weed/storage/needle/needle_read_page.go
@@ -21,6 +21,9 @@ func (n *Needle) ReadNeedleData(r backend.BackendStorageFile, volumeOffset int64
startOffset := volumeOffset + NeedleHeaderSize + DataSizeSize + needleOffset
count, err = r.ReadAt(data[:sizeToRead], startOffset)
+ if err == io.EOF && int64(count) == sizeToRead {
+ err = nil
+ }
if err != nil {
fileSize, _, _ := r.GetStat()
glog.Errorf("%s read %d %d size %d at offset %d fileSize %d: %v", r.Name(), n.Id, needleOffset, sizeToRead, volumeOffset, fileSize, err)
@@ -40,6 +43,9 @@ func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size
bytes := make([]byte, NeedleHeaderSize+DataSizeSize)
count, err := r.ReadAt(bytes, offset)
+ if err == io.EOF && count == NeedleHeaderSize+DataSizeSize {
+ err = nil
+ }
if count != NeedleHeaderSize+DataSizeSize || err != nil {
return err
}
diff --git a/weed/storage/needle_map_metric.go b/weed/storage/needle_map_metric.go
index 7669180ba..d6d0a8730 100644
--- a/weed/storage/needle_map_metric.go
+++ b/weed/storage/needle_map_metric.go
@@ -2,6 +2,7 @@ package storage
import (
"fmt"
+ "io"
"os"
"sync/atomic"
@@ -152,8 +153,11 @@ func reverseWalkIndexFile(r *os.File, initFn func(entryCount int64), fn func(key
remainingCount := entryCount - nextBatchSize
for remainingCount >= 0 {
- _, e := r.ReadAt(bytes[:NeedleMapEntrySize*nextBatchSize], NeedleMapEntrySize*remainingCount)
+ n, e := r.ReadAt(bytes[:NeedleMapEntrySize*nextBatchSize], NeedleMapEntrySize*remainingCount)
// glog.V(0).Infoln("file", r.Name(), "readerOffset", NeedleMapEntrySize*remainingCount, "count", count, "e", e)
+ if e == io.EOF && n == int(NeedleMapEntrySize*nextBatchSize) {
+ e = nil
+ }
if e != nil {
return e
}
diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go
index 7f4d9797a..014a870db 100644
--- a/weed/storage/store_ec.go
+++ b/weed/storage/store_ec.go
@@ -201,9 +201,11 @@ func (s *Store) readOneEcShardInterval(needleId types.NeedleId, ecVolume *erasur
shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
data = make([]byte, interval.Size)
if shard, found := ecVolume.FindEcVolumeShard(shardId); found {
- if _, err = shard.ReadAt(data, actualOffset); err != nil {
- glog.V(0).Infof("read local ec shard %d.%d offset %d: %v", ecVolume.VolumeId, shardId, actualOffset, err)
- return
+ if n, err = shard.ReadAt(data, actualOffset); err != nil {
+ if n != interval.Size {
+ glog.V(0).Infof("read local ec shard %d.%d offset %d: %v", ecVolume.VolumeId, shardId, actualOffset, err)
+ return
+ }
}
} else {
ecVolume.ShardLocationsLock.RLock()
diff --git a/weed/storage/super_block/super_block_read.go.go b/weed/storage/super_block/super_block_read.go.go
index fd1a874dd..dba1cba72 100644
--- a/weed/storage/super_block/super_block_read.go.go
+++ b/weed/storage/super_block/super_block_read.go.go
@@ -15,9 +15,11 @@ import (
func ReadSuperBlock(datBackend backend.BackendStorageFile) (superBlock SuperBlock, err error) {
header := make([]byte, SuperBlockSize)
- if _, e := datBackend.ReadAt(header, 0); e != nil {
- err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.Name(), e)
- return
+ if n, e := datBackend.ReadAt(header, 0); e != nil {
+ if n != SuperBlockSize {
+ err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.Name(), e)
+ return
+ }
}
superBlock.Version = needle.Version(header[0])
diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go
index 9bd432f85..6d2335f70 100644
--- a/weed/storage/volume_checking.go
+++ b/weed/storage/volume_checking.go
@@ -80,7 +80,11 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err
return
}
bytes = make([]byte, NeedleMapEntrySize)
- _, err = indexFile.ReadAt(bytes, offset)
+ var readCount int
+ readCount, err = indexFile.ReadAt(bytes, offset)
+ if err == io.EOF && readCount == NeedleMapEntrySize {
+ err = nil
+ }
return
}
@@ -97,7 +101,11 @@ func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version,
}
if v == needle.Version3 {
bytes := make([]byte, TimestampSize)
- _, err = datFile.ReadAt(bytes, offset+NeedleHeaderSize+int64(size)+needle.NeedleChecksumSize)
+ var readCount int
+ readCount, err = datFile.ReadAt(bytes, offset+NeedleHeaderSize+int64(size)+needle.NeedleChecksumSize)
+ if err == io.EOF && readCount == TimestampSize {
+ err = nil
+ }
if err == io.EOF {
return 0, err
}
diff --git a/weed/util/chunk_cache/chunk_cache_on_disk.go b/weed/util/chunk_cache/chunk_cache_on_disk.go
index 3968f62e9..87f05d399 100644
--- a/weed/util/chunk_cache/chunk_cache_on_disk.go
+++ b/weed/util/chunk_cache/chunk_cache_on_disk.go
@@ -110,8 +110,10 @@ func (v *ChunkCacheVolume) GetNeedle(key types.NeedleId) ([]byte, error) {
}
data := make([]byte, nv.Size)
if readSize, readErr := v.DataBackend.ReadAt(data, nv.Offset.ToActualOffset()); readErr != nil {
- return nil, fmt.Errorf("read %s.dat [%d,%d): %v",
- v.fileName, nv.Offset.ToActualOffset(), nv.Offset.ToActualOffset()+int64(nv.Size), readErr)
+ if readSize != int(nv.Size) {
+ return nil, fmt.Errorf("read %s.dat [%d,%d): %v",
+ v.fileName, nv.Offset.ToActualOffset(), nv.Offset.ToActualOffset()+int64(nv.Size), readErr)
+ }
} else {
if readSize != int(nv.Size) {
return nil, fmt.Errorf("read %d, expected %d", readSize, nv.Size)
@@ -133,8 +135,10 @@ func (v *ChunkCacheVolume) getNeedleSlice(key types.NeedleId, offset, length uin
}
data := make([]byte, wanted)
if readSize, readErr := v.DataBackend.ReadAt(data, nv.Offset.ToActualOffset()+int64(offset)); readErr != nil {
- return nil, fmt.Errorf("read %s.dat [%d,%d): %v",
- v.fileName, nv.Offset.ToActualOffset()+int64(offset), int(nv.Offset.ToActualOffset())+int(offset)+wanted, readErr)
+ if readSize != wanted {
+ return nil, fmt.Errorf("read %s.dat [%d,%d): %v",
+ v.fileName, nv.Offset.ToActualOffset()+int64(offset), int(nv.Offset.ToActualOffset())+int(offset)+wanted, readErr)
+ }
} else {
if readSize != wanted {
return nil, fmt.Errorf("read %d, expected %d", readSize, wanted)
@@ -155,8 +159,10 @@ func (v *ChunkCacheVolume) readNeedleSliceAt(data []byte, key types.NeedleId, of
return 0, ErrorOutOfBounds
}
if n, err = v.DataBackend.ReadAt(data, nv.Offset.ToActualOffset()+int64(offset)); err != nil {
- return n, fmt.Errorf("read %s.dat [%d,%d): %v",
- v.fileName, nv.Offset.ToActualOffset()+int64(offset), int(nv.Offset.ToActualOffset())+int(offset)+wanted, err)
+ if n != wanted {
+ return n, fmt.Errorf("read %s.dat [%d,%d): %v",
+ v.fileName, nv.Offset.ToActualOffset()+int64(offset), int(nv.Offset.ToActualOffset())+int(offset)+wanted, err)
+ }
} else {
if n != wanted {
return n, fmt.Errorf("read %d, expected %d", n, wanted)