diff options
Diffstat (limited to 'weed/storage/volume_read.go')
| -rw-r--r-- | weed/storage/volume_read.go | 80 |
1 files changed, 68 insertions, 12 deletions
diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go index 9751b56ae..ced3fcb8f 100644 --- a/weed/storage/volume_read.go +++ b/weed/storage/volume_read.go @@ -2,6 +2,7 @@ package storage import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/util/mem" "io" "time" @@ -12,8 +13,10 @@ import ( . "github.com/chrislusf/seaweedfs/weed/storage/types" ) +const PagedReadLimit = 1024 * 1024 + // read fills in Needle content by looking up n.Id from NeedleMapper -func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption, onReadSizeFn func(size Size)) (int, error) { +func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption, onReadSizeFn func(size Size)) (count int, err error) { v.dataFileAccessLock.RLock() defer v.dataFileAccessLock.RUnlock() @@ -36,32 +39,85 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption, onReadSize if onReadSizeFn != nil { onReadSizeFn(readSize) } - err := n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version()) - if err == needle.ErrorSizeMismatch && OffsetSize == 4 { - err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version()) + if readOption != nil && readOption.AttemptMetaOnly && readSize > PagedReadLimit { + readOption.VolumeRevision = v.SuperBlock.CompactionRevision + readOption.ChecksumValue, err = n.ReadNeedleMeta(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version()) + if err == needle.ErrorSizeMismatch && OffsetSize == 4 { + readOption.IsOutOfRange = true + readOption.ChecksumValue, err = n.ReadNeedleMeta(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version()) + } + if err != nil { + return 0, err + } + if !n.IsCompressed() && !n.IsChunkedManifest() { + readOption.IsMetaOnly = true + } } - v.checkReadWriteError(err) - if err != nil { - return 0, err + if readOption == nil || !readOption.IsMetaOnly { + err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version()) + if err == needle.ErrorSizeMismatch && OffsetSize == 4 { + err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version()) + } + v.checkReadWriteError(err) + if err != nil { + return 0, err + } } - bytesRead := len(n.Data) + count = int(n.DataSize) if !n.HasTtl() { - return bytesRead, nil + return } ttlMinutes := n.Ttl.Minutes() if ttlMinutes == 0 { - return bytesRead, nil + return } if !n.HasLastModifiedDate() { - return bytesRead, nil + return } if time.Now().Before(time.Unix(0, int64(n.AppendAtNs)).Add(time.Duration(ttlMinutes) * time.Minute)) { - return bytesRead, nil + return } return -1, ErrorNotFound } // read fills in Needle content by looking up n.Id from NeedleMapper +func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, writer io.Writer, offset int64, size int64) (err error) { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + + nv, ok := v.nm.Get(n.Id) + if !ok || nv.Offset.IsZero() { + return ErrorNotFound + } + readSize := nv.Size + if readSize.IsDeleted() { + if readOption != nil && readOption.ReadDeleted && readSize != TombstoneFileSize { + glog.V(3).Infof("reading deleted %s", n.String()) + readSize = -readSize + } else { + return ErrorDeleted + } + } + if readSize == 0 { + return nil + } + + if readOption.VolumeRevision != v.SuperBlock.CompactionRevision { + // the volume is compacted + readOption.IsOutOfRange = false + readOption.ChecksumValue, err = n.ReadNeedleMeta(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version()) + } + buf := mem.Allocate(1024 * 1024) + defer mem.Free(buf) + actualOffset := nv.Offset.ToActualOffset() + if readOption.IsOutOfRange { + actualOffset += int64(MaxPossibleVolumeSize) + } + + return n.ReadNeedleDataInto(v.DataBackend, actualOffset, buf, writer, offset, size, readOption.ChecksumValue) +} + +// read fills in Needle content by looking up n.Id from NeedleMapper func (v *Volume) ReadNeedleBlob(offset int64, size Size) ([]byte, error) { v.dataFileAccessLock.RLock() defer v.dataFileAccessLock.RUnlock() |
