aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/volume_read.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/volume_read.go')
-rw-r--r--weed/storage/volume_read.go34
1 files changed, 29 insertions, 5 deletions
diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go
index e9c7cf4cb..e045137b4 100644
--- a/weed/storage/volume_read.go
+++ b/weed/storage/volume_read.go
@@ -4,6 +4,7 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"io"
+ "sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -101,9 +102,18 @@ func (v *Volume) readNeedleMetaAt(n *needle.Needle, offset int64, size int32) (e
// read fills in Needle content by looking up n.Id from NeedleMapper
func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, writer io.Writer, offset int64, size int64) (err error) {
- v.dataFileAccessLock.RLock()
+ if !readOption.HasSlowRead {
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
+ }
+
+ if readOption.HasSlowRead {
+ v.dataFileAccessLock.RLock()
+ }
nv, ok := v.nm.Get(n.Id)
- v.dataFileAccessLock.RUnlock()
+ if readOption.HasSlowRead {
+ v.dataFileAccessLock.RUnlock()
+ }
if !ok || nv.Offset.IsZero() {
return ErrorNotFound
@@ -133,20 +143,26 @@ func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, wr
crc := needle.CRC(0)
for x := offset; x < offset+size; x += int64(len(buf)) {
- v.dataFileAccessLock.RLock()
+ if readOption.HasSlowRead {
+ v.dataFileAccessLock.RLock()
+ }
// possibly re-read needle offset if volume is compacted
if readOption.VolumeRevision != v.SuperBlock.CompactionRevision {
// the volume is compacted
nv, ok = v.nm.Get(n.Id)
if !ok || nv.Offset.IsZero() {
- v.dataFileAccessLock.RUnlock()
+ if readOption.HasSlowRead {
+ v.dataFileAccessLock.RUnlock()
+ }
return ErrorNotFound
}
actualOffset = nv.Offset.ToActualOffset()
readOption.VolumeRevision = v.SuperBlock.CompactionRevision
}
count, err := n.ReadNeedleData(v.DataBackend, actualOffset, buf, x)
- v.dataFileAccessLock.RUnlock()
+ if readOption.HasSlowRead {
+ v.dataFileAccessLock.RUnlock()
+ }
toWrite := min(count, int(offset+size-x))
if toWrite > 0 {
@@ -174,6 +190,14 @@ func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, wr
}
+func synchronizedRead(rwLock *sync.RWMutex, enabled bool, closure func() error) error {
+ if enabled {
+ rwLock.RLock()
+ defer rwLock.RUnlock()
+ }
+ return closure()
+}
+
func min(x, y int) int {
if x < y {
return x