aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/volume_read.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/volume_read.go')
-rw-r--r--weed/storage/volume_read.go80
1 files changed, 68 insertions, 12 deletions
diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go
index 9751b56ae..6b17eea0a 100644
--- a/weed/storage/volume_read.go
+++ b/weed/storage/volume_read.go
@@ -2,6 +2,7 @@ package storage
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/util/mem"
"io"
"time"
@@ -12,8 +13,10 @@ import (
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
+const PagedReadLimit = 1024 * 1024
+
// read fills in Needle content by looking up n.Id from NeedleMapper
-func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption, onReadSizeFn func(size Size)) (int, error) {
+func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption, onReadSizeFn func(size Size)) (count int, err error) {
v.dataFileAccessLock.RLock()
defer v.dataFileAccessLock.RUnlock()
@@ -36,32 +39,85 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption, onReadSize
if onReadSizeFn != nil {
onReadSizeFn(readSize)
}
- err := n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
- if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
- err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
+ if readOption != nil && readOption.AttemptMetaOnly && readSize > PagedReadLimit {
+ readOption.VolumeRevision = v.SuperBlock.CompactionRevision
+ err = n.ReadNeedleMeta(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
+ if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
+ readOption.IsOutOfRange = true
+ err = n.ReadNeedleMeta(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
+ }
+ if err != nil {
+ return 0, err
+ }
+ if !n.IsCompressed() && !n.IsChunkedManifest() {
+ readOption.IsMetaOnly = true
+ }
}
- v.checkReadWriteError(err)
- if err != nil {
- return 0, err
+ if readOption == nil || !readOption.IsMetaOnly {
+ err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
+ if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
+ err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
+ }
+ v.checkReadWriteError(err)
+ if err != nil {
+ return 0, err
+ }
}
- bytesRead := len(n.Data)
+ count = int(n.DataSize)
if !n.HasTtl() {
- return bytesRead, nil
+ return
}
ttlMinutes := n.Ttl.Minutes()
if ttlMinutes == 0 {
- return bytesRead, nil
+ return
}
if !n.HasLastModifiedDate() {
- return bytesRead, nil
+ return
}
if time.Now().Before(time.Unix(0, int64(n.AppendAtNs)).Add(time.Duration(ttlMinutes) * time.Minute)) {
- return bytesRead, nil
+ return
}
return -1, ErrorNotFound
}
// read fills in Needle content by looking up n.Id from NeedleMapper
+func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, writer io.Writer, offset int64, size int64) (err error) {
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
+
+ nv, ok := v.nm.Get(n.Id)
+ if !ok || nv.Offset.IsZero() {
+ return ErrorNotFound
+ }
+ readSize := nv.Size
+ if readSize.IsDeleted() {
+ if readOption != nil && readOption.ReadDeleted && readSize != TombstoneFileSize {
+ glog.V(3).Infof("reading deleted %s", n.String())
+ readSize = -readSize
+ } else {
+ return ErrorDeleted
+ }
+ }
+ if readSize == 0 {
+ return nil
+ }
+
+ if readOption.VolumeRevision != v.SuperBlock.CompactionRevision {
+ // the volume is compacted
+ readOption.IsOutOfRange = false
+ err = n.ReadNeedleMeta(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
+ }
+ buf := mem.Allocate(1024 * 1024)
+ defer mem.Free(buf)
+ actualOffset := nv.Offset.ToActualOffset()
+ if readOption.IsOutOfRange {
+ actualOffset += int64(MaxPossibleVolumeSize)
+ }
+
+ return n.ReadNeedleDataInto(v.DataBackend, actualOffset, buf, writer, offset, size)
+}
+
+// read fills in Needle content by looking up n.Id from NeedleMapper
func (v *Volume) ReadNeedleBlob(offset int64, size Size) ([]byte, error) {
v.dataFileAccessLock.RLock()
defer v.dataFileAccessLock.RUnlock()