aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/volume_read.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/volume_read.go')
-rw-r--r--weed/storage/volume_read.go27
1 files changed, 21 insertions, 6 deletions
diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go
index e9c7cf4cb..8b30bd5d4 100644
--- a/weed/storage/volume_read.go
+++ b/weed/storage/volume_read.go
@@ -101,9 +101,18 @@ func (v *Volume) readNeedleMetaAt(n *needle.Needle, offset int64, size int32) (e
// read fills in Needle content by looking up n.Id from NeedleMapper
func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, writer io.Writer, offset int64, size int64) (err error) {
- v.dataFileAccessLock.RLock()
+ if !readOption.HasSlowRead {
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
+ }
+
+ if readOption.HasSlowRead {
+ v.dataFileAccessLock.RLock()
+ }
nv, ok := v.nm.Get(n.Id)
- v.dataFileAccessLock.RUnlock()
+ if readOption.HasSlowRead {
+ v.dataFileAccessLock.RUnlock()
+ }
if !ok || nv.Offset.IsZero() {
return ErrorNotFound
@@ -126,27 +135,33 @@ func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, wr
actualOffset += int64(MaxPossibleVolumeSize)
}
- buf := mem.Allocate(min(1024*1024, int(size)))
+ buf := mem.Allocate(min(readOption.ReadBufferSize, int(size)))
defer mem.Free(buf)
// read needle data
crc := needle.CRC(0)
for x := offset; x < offset+size; x += int64(len(buf)) {
- v.dataFileAccessLock.RLock()
+ if readOption.HasSlowRead {
+ v.dataFileAccessLock.RLock()
+ }
// possibly re-read needle offset if volume is compacted
if readOption.VolumeRevision != v.SuperBlock.CompactionRevision {
// the volume is compacted
nv, ok = v.nm.Get(n.Id)
if !ok || nv.Offset.IsZero() {
- v.dataFileAccessLock.RUnlock()
+ if readOption.HasSlowRead {
+ v.dataFileAccessLock.RUnlock()
+ }
return ErrorNotFound
}
actualOffset = nv.Offset.ToActualOffset()
readOption.VolumeRevision = v.SuperBlock.CompactionRevision
}
count, err := n.ReadNeedleData(v.DataBackend, actualOffset, buf, x)
- v.dataFileAccessLock.RUnlock()
+ if readOption.HasSlowRead {
+ v.dataFileAccessLock.RUnlock()
+ }
toWrite := min(count, int(offset+size-x))
if toWrite > 0 {