aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-09-08 19:05:31 -0700
committerchrislu <chris.lu@gmail.com>2022-09-08 19:05:31 -0700
commitd14d029c736b819ab550818852817e8452c05931 (patch)
treeff5478f92d6ea018a9b25b8fe12c153288da0ae4
parent406a80da4b87d4e984696030d8a8e5c962c1ad4b (diff)
downloadseaweedfs-d14d029c736b819ab550818852817e8452c05931.tar.xz
seaweedfs-d14d029c736b819ab550818852817e8452c05931.zip
reduce v.dataFileAccessLock lock scope
-rw-r--r--weed/storage/volume_read.go33
1 files changed, 24 insertions, 9 deletions
diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go
index d072e0ac9..c12f2baf4 100644
--- a/weed/storage/volume_read.go
+++ b/weed/storage/volume_read.go
@@ -100,10 +100,11 @@ func (v *Volume) readNeedleMetaAt(n *needle.Needle, offset int64, size int32) (e
// read fills in Needle content by looking up n.Id from NeedleMapper
func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, writer io.Writer, offset int64, size int64) (err error) {
- v.dataFileAccessLock.RLock()
- defer v.dataFileAccessLock.RUnlock()
+ v.dataFileAccessLock.RLock()
nv, ok := v.nm.Get(n.Id)
+ v.dataFileAccessLock.RUnlock()
+
if !ok || nv.Offset.IsZero() {
return ErrorNotFound
}
@@ -120,22 +121,36 @@ func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, wr
return nil
}
- if readOption.VolumeRevision != v.SuperBlock.CompactionRevision {
- // the volume is compacted
- readOption.IsOutOfRange = false
- err = n.ReadNeedleMeta(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
- }
- buf := mem.Allocate(min(1024*1024, int(size)))
- defer mem.Free(buf)
actualOffset := nv.Offset.ToActualOffset()
if readOption.IsOutOfRange {
actualOffset += int64(MaxPossibleVolumeSize)
}
+ buf := mem.Allocate(min(1024*1024, int(size)))
+ defer mem.Free(buf)
+
// read needle data
crc := needle.CRC(0)
for x := offset; x < offset+size; x += int64(len(buf)) {
+
+ v.dataFileAccessLock.RLock()
+ // possibly re-read needle offset if volume is compacted
+ if readOption.VolumeRevision != v.SuperBlock.CompactionRevision {
+ // the volume is compacted
+ readOption.IsOutOfRange = false
+ nv, ok = v.nm.Get(n.Id)
+ if !ok || nv.Offset.IsZero() {
+ v.dataFileAccessLock.RUnlock()
+ return ErrorNotFound
+ }
+ actualOffset := nv.Offset.ToActualOffset()
+ if readOption.IsOutOfRange {
+ actualOffset += int64(MaxPossibleVolumeSize)
+ }
+ }
count, err := n.ReadNeedleData(v.DataBackend, actualOffset, buf, x)
+ v.dataFileAccessLock.RUnlock()
+
toWrite := min(count, int(offset+size-x))
if toWrite > 0 {
crc = crc.Update(buf[0:toWrite])