aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-06-05 11:54:04 -0700
committerchrislu <chris.lu@gmail.com>2022-06-05 11:54:04 -0700
commitecef844dfc5bab7a28b34bbe1472d1c9585c7e41 (patch)
tree82e05a80622c184062b651b4585fd3e0094bc955
parent48cca4e54f0626e55dbe0f503098c23d1a950a2b (diff)
downloadseaweedfs-ecef844dfc5bab7a28b34bbe1472d1c9585c7e41.tar.xz
seaweedfs-ecef844dfc5bab7a28b34bbe1472d1c9585c7e41.zip
stream read large files
-rw-r--r--weed/server/volume_server_handlers_read.go51
-rw-r--r--weed/storage/needle/needle_read_page.go30
-rw-r--r--weed/storage/needle/needle_read_test.go2
-rw-r--r--weed/storage/store.go15
-rw-r--r--weed/storage/volume_read.go80
5 files changed, 149 insertions, 29 deletions
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index b9213a15d..eb5b2be5a 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -127,6 +127,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
var count int
var needleSize types.Size
+ readOption.AttemptMetaOnly, readOption.MustMetaOnly = shouldAttemptStreamWrite(hasVolume, ext, r)
onReadSizeFn := func(size types.Size) {
needleSize = size
atomic.AddInt64(&vs.inFlightDownloadDataSize, int64(needleSize))
@@ -218,11 +219,31 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
}
- rs := conditionallyResizeImages(bytes.NewReader(n.Data), ext, r)
+ if !readOption.IsMetaOnly {
+ rs := conditionallyResizeImages(bytes.NewReader(n.Data), ext, r)
+ if e := writeResponseContent(filename, mtype, rs, w, r); e != nil {
+ glog.V(2).Infoln("response write error:", e)
+ }
+ } else {
+ vs.streamWriteResponseContent(filename, mtype, volumeId, n, w, r, readOption)
+ }
+}
- if e := writeResponseContent(filename, mtype, rs, w, r); e != nil {
- glog.V(2).Infoln("response write error:", e)
+func shouldAttemptStreamWrite(hasLocalVolume bool, ext string, r *http.Request) (shouldAttempt bool, mustMetaOnly bool) {
+ if !hasLocalVolume {
+ return false, false
+ }
+ if len(ext) > 0 {
+ ext = strings.ToLower(ext)
+ }
+ if r.Method == "HEAD" {
+ return true, true
+ }
+ _, _, _, shouldResize := shouldResizeImages(ext, r)
+ if shouldResize {
+ return false, false
}
+ return true, false
}
func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string, ext string, w http.ResponseWriter, r *http.Request) (processed bool) {
@@ -318,3 +339,27 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re
})
return nil
}
+
+func (vs *VolumeServer) streamWriteResponseContent(filename string, mimeType string, volumeId needle.VolumeId, n *needle.Needle, w http.ResponseWriter, r *http.Request, readOption *storage.ReadOption) {
+ totalSize := int64(n.DataSize)
+ if mimeType == "" {
+ if ext := filepath.Ext(filename); ext != "" {
+ mimeType = mime.TypeByExtension(ext)
+ }
+ }
+ if mimeType != "" {
+ w.Header().Set("Content-Type", mimeType)
+ }
+ w.Header().Set("Accept-Ranges", "bytes")
+ adjustPassthroughHeaders(w, r, filename)
+
+ if r.Method == "HEAD" {
+ w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
+ return
+ }
+
+ processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
+ return vs.store.ReadVolumeNeedleDataInto(volumeId, n, readOption, writer, offset, size)
+ })
+
+}
diff --git a/weed/storage/needle/needle_read_page.go b/weed/storage/needle/needle_read_page.go
index 300b415c9..1fe40f847 100644
--- a/weed/storage/needle/needle_read_page.go
+++ b/weed/storage/needle/needle_read_page.go
@@ -10,26 +10,28 @@ import (
)
// ReadNeedleDataInto uses a needle without n.Data to read the content into an io.Writer
-func (n *Needle) ReadNeedleDataInto(r backend.BackendStorageFile, offset int64, buf []byte, writer io.Writer, expectedChecksumValue uint32) (err error) {
+func (n *Needle) ReadNeedleDataInto(r backend.BackendStorageFile, volumeOffset int64, buf []byte, writer io.Writer, needleOffset int64, size int64, expectedChecksumValue uint32) (err error) {
crc := CRC(0)
- for x := 0; ; x += len(buf) {
- count, err := n.ReadNeedleData(r, offset, buf, int64(x))
+ for x := needleOffset; x < needleOffset+size; x += int64(len(buf)) {
+ count, err := n.ReadNeedleData(r, volumeOffset, buf, x)
+ if count > 0 {
+ crc = crc.Update(buf[0:count])
+ if _, err = writer.Write(buf[0:count]); err != nil {
+ return fmt.Errorf("ReadNeedleData write: %v", err)
+ }
+ }
if err != nil {
if err == io.EOF {
+ err = nil
break
}
return fmt.Errorf("ReadNeedleData: %v", err)
}
- if count > 0 {
- crc = crc.Update(buf[0:count])
- if _, err = writer.Write(buf[0:count]); err != nil {
- return fmt.Errorf("ReadNeedleData write: %v", err)
- }
- } else {
+ if count <= 0 {
break
}
}
- if expectedChecksumValue != crc.Value() {
+ if needleOffset == 0 && size == int64(n.DataSize) && expectedChecksumValue != crc.Value() {
return fmt.Errorf("ReadNeedleData checksum %v expected %v", crc.Value(), expectedChecksumValue)
}
return nil
@@ -65,14 +67,18 @@ func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size
return 0, err
}
n.ParseNeedleHeader(bytes)
+ if n.Size != size {
+ if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) {
+ return 0, ErrorSizeMismatch
+ }
+ }
+
n.DataSize = util.BytesToUint32(bytes[NeedleHeaderSize : NeedleHeaderSize+DataSizeSize])
startOffset := offset + NeedleHeaderSize + DataSizeSize + int64(n.DataSize)
dataSize := GetActualSize(size, version)
stopOffset := offset + dataSize
metaSize := stopOffset - startOffset
- fmt.Printf("offset %d dataSize %d\n", offset, dataSize)
- fmt.Printf("read needle meta [%d,%d) size %d\n", startOffset, stopOffset, metaSize)
metaSlice := make([]byte, int(metaSize))
count, err = r.ReadAt(metaSlice, startOffset)
diff --git a/weed/storage/needle/needle_read_test.go b/weed/storage/needle/needle_read_test.go
index 688df0d53..9ffd8836e 100644
--- a/weed/storage/needle/needle_read_test.go
+++ b/weed/storage/needle/needle_read_test.go
@@ -74,7 +74,7 @@ func TestPageRead(t *testing.T) {
fmt.Printf("Checksum value %d\n", checksumValue)
buf := make([]byte, 1024)
- if err = n.ReadNeedleDataInto(datBackend, offset, buf, io.Discard, checksumValue); err != nil {
+ if err = n.ReadNeedleDataInto(datBackend, offset, buf, io.Discard, 0, int64(n.DataSize), checksumValue); err != nil {
t.Fatalf("ReadNeedleDataInto: %v", err)
}
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 7f1d35f33..81e69aaa2 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -2,6 +2,7 @@ package storage
import (
"fmt"
+ "io"
"path/filepath"
"strings"
"sync/atomic"
@@ -26,7 +27,13 @@ const (
)
type ReadOption struct {
- ReadDeleted bool
+ ReadDeleted bool
+ AttemptMetaOnly bool
+ MustMetaOnly bool
+ IsMetaOnly bool // read status
+ ChecksumValue uint32 // read status
+ VolumeRevision uint16
+ IsOutOfRange bool // whether need to read over MaxPossibleVolumeSize
}
/*
@@ -375,6 +382,12 @@ func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle, readOption
}
return 0, fmt.Errorf("volume %d not found", i)
}
+func (s *Store) ReadVolumeNeedleDataInto(i needle.VolumeId, n *needle.Needle, readOption *ReadOption, writer io.Writer, offset int64, size int64) error {
+ if v := s.findVolume(i); v != nil {
+ return v.readNeedleDataInto(n, readOption, writer, offset, size)
+ }
+ return fmt.Errorf("volume %d not found", i)
+}
func (s *Store) GetVolume(i needle.VolumeId) *Volume {
return s.findVolume(i)
}
diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go
index 9751b56ae..ced3fcb8f 100644
--- a/weed/storage/volume_read.go
+++ b/weed/storage/volume_read.go
@@ -2,6 +2,7 @@ package storage
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/util/mem"
"io"
"time"
@@ -12,8 +13,10 @@ import (
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
+const PagedReadLimit = 1024 * 1024
+
// read fills in Needle content by looking up n.Id from NeedleMapper
-func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption, onReadSizeFn func(size Size)) (int, error) {
+func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption, onReadSizeFn func(size Size)) (count int, err error) {
v.dataFileAccessLock.RLock()
defer v.dataFileAccessLock.RUnlock()
@@ -36,32 +39,85 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption, onReadSize
if onReadSizeFn != nil {
onReadSizeFn(readSize)
}
- err := n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
- if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
- err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
+ if readOption != nil && readOption.AttemptMetaOnly && readSize > PagedReadLimit {
+ readOption.VolumeRevision = v.SuperBlock.CompactionRevision
+ readOption.ChecksumValue, err = n.ReadNeedleMeta(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
+ if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
+ readOption.IsOutOfRange = true
+ readOption.ChecksumValue, err = n.ReadNeedleMeta(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
+ }
+ if err != nil {
+ return 0, err
+ }
+ if !n.IsCompressed() && !n.IsChunkedManifest() {
+ readOption.IsMetaOnly = true
+ }
}
- v.checkReadWriteError(err)
- if err != nil {
- return 0, err
+ if readOption == nil || !readOption.IsMetaOnly {
+ err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
+ if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
+ err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
+ }
+ v.checkReadWriteError(err)
+ if err != nil {
+ return 0, err
+ }
}
- bytesRead := len(n.Data)
+ count = int(n.DataSize)
if !n.HasTtl() {
- return bytesRead, nil
+ return
}
ttlMinutes := n.Ttl.Minutes()
if ttlMinutes == 0 {
- return bytesRead, nil
+ return
}
if !n.HasLastModifiedDate() {
- return bytesRead, nil
+ return
}
if time.Now().Before(time.Unix(0, int64(n.AppendAtNs)).Add(time.Duration(ttlMinutes) * time.Minute)) {
- return bytesRead, nil
+ return
}
return -1, ErrorNotFound
}
// read fills in Needle content by looking up n.Id from NeedleMapper
+func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, writer io.Writer, offset int64, size int64) (err error) {
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
+
+ nv, ok := v.nm.Get(n.Id)
+ if !ok || nv.Offset.IsZero() {
+ return ErrorNotFound
+ }
+ readSize := nv.Size
+ if readSize.IsDeleted() {
+ if readOption != nil && readOption.ReadDeleted && readSize != TombstoneFileSize {
+ glog.V(3).Infof("reading deleted %s", n.String())
+ readSize = -readSize
+ } else {
+ return ErrorDeleted
+ }
+ }
+ if readSize == 0 {
+ return nil
+ }
+
+ if readOption.VolumeRevision != v.SuperBlock.CompactionRevision {
+ // the volume is compacted
+ readOption.IsOutOfRange = false
+ readOption.ChecksumValue, err = n.ReadNeedleMeta(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
+ }
+ buf := mem.Allocate(1024 * 1024)
+ defer mem.Free(buf)
+ actualOffset := nv.Offset.ToActualOffset()
+ if readOption.IsOutOfRange {
+ actualOffset += int64(MaxPossibleVolumeSize)
+ }
+
+ return n.ReadNeedleDataInto(v.DataBackend, actualOffset, buf, writer, offset, size, readOption.ChecksumValue)
+}
+
+// read fills in Needle content by looking up n.Id from NeedleMapper
func (v *Volume) ReadNeedleBlob(offset int64, size Size) ([]byte, error) {
v.dataFileAccessLock.RLock()
defer v.dataFileAccessLock.RUnlock()