aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-06-07 22:45:07 -0700
committerchrislu <chris.lu@gmail.com>2025-06-07 22:45:07 -0700
commitb25f95df0a49d22e111dbb0fb42e60a937bb97ce (patch)
tree35964d91ed7ab3366c48529cc8ff43e863cf63ae
parentb7427c7d764aadd8bdfd4e781ec79e6792e75f08 (diff)
downloadseaweedfs-b25f95df0a49d22e111dbb0fb42e60a937bb97ce.tar.xz
seaweedfs-b25f95df0a49d22e111dbb0fb42e60a937bb97ce.zip
implement read options
-rw-r--r--weed/storage/needle/needle_read_options.go94
-rw-r--r--weed/storage/needle/needle_read_options_test.go87
2 files changed, 173 insertions, 8 deletions
diff --git a/weed/storage/needle/needle_read_options.go b/weed/storage/needle/needle_read_options.go
index 6ddc2d88e..77f0f26b8 100644
--- a/weed/storage/needle/needle_read_options.go
+++ b/weed/storage/needle/needle_read_options.go
@@ -1,8 +1,11 @@
package needle
import (
+ "io"
+
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
+ "github.com/seaweedfs/seaweedfs/weed/util"
)
// NeedleReadOptions specifies which parts of the Needle to read.
@@ -13,11 +16,94 @@ type NeedleReadOptions struct {
}
// ReadFromFile reads the Needle from the backend file according to the specified options.
-// For now, this is equivalent to ReadData (reads everything).
+// - If only ReadHeader is true, only the header is read and parsed.
+// - If ReadData or ReadMeta is true, reads GetActualSize(size, version) bytes from disk (size is the logical body size).
func (n *Needle) ReadFromFile(r backend.BackendStorageFile, offset int64, size Size, version Version, opts NeedleReadOptions) error {
- // Always read header and body for now (full read)
- bytes, err := ReadNeedleBlob(r, offset, size, version)
- if err != nil {
+ if opts.ReadHeader && !opts.ReadData && !opts.ReadMeta {
+ // Only read the header
+ header := make([]byte, NeedleHeaderSize)
+ count, err := r.ReadAt(header, offset)
+ if err == io.EOF && count == NeedleHeaderSize {
+ err = nil
+ }
+ if count != NeedleHeaderSize || err != nil {
+ return err
+ }
+ n.ParseNeedleHeader(header)
+ return nil
+ }
+ if opts.ReadHeader && opts.ReadMeta && !opts.ReadData {
+ // Read header first
+ header := make([]byte, NeedleHeaderSize)
+ count, err := r.ReadAt(header, offset)
+ if err == io.EOF && count == NeedleHeaderSize {
+ err = nil
+ }
+ if count != NeedleHeaderSize || err != nil {
+ return err
+ }
+ n.ParseNeedleHeader(header)
+
+ // Now read meta fields after DataSize+Data
+ metaOffset := offset + int64(NeedleHeaderSize)
+ metaIndex := 0
+ if version == Version2 || version == Version3 {
+ // Read DataSize to know how much to skip
+ dsBuf := make([]byte, 4)
+ count, err := r.ReadAt(dsBuf, metaOffset)
+ if err == io.EOF && count == 4 {
+ err = nil
+ }
+ if count != 4 || err != nil {
+ return err
+ }
+ dataSize := int(util.BytesToUint32(dsBuf))
+ metaIndex = 4 + dataSize
+ // Read meta fields (Flags, Name, Mime, etc.)
+ metaFieldsLen := int(n.Size) - dataSize // upper bound, may be more than needed
+ metaFieldsBuf := make([]byte, metaFieldsLen)
+ count, err = r.ReadAt(metaFieldsBuf, metaOffset+int64(metaIndex))
+ if err == io.EOF && count == metaFieldsLen {
+ err = nil
+ }
+ if count <= 0 || err != nil {
+ return err
+ }
+ _, err = n.readNeedleDataVersion2NonData(metaFieldsBuf)
+ if err != nil {
+ return err
+ }
+ // Now read checksum and (for v3) appendAtNs at the end
+ endMetaOffset := offset + int64(NeedleHeaderSize) + int64(n.Size)
+ endMetaLen := NeedleChecksumSize
+ if version == Version3 {
+ endMetaLen += TimestampSize
+ }
+ endMetaBuf := make([]byte, endMetaLen)
+ count, err = r.ReadAt(endMetaBuf, endMetaOffset)
+ if err == io.EOF && count == endMetaLen {
+ err = nil
+ }
+ if count != endMetaLen || err != nil {
+ return err
+ }
+ n.Checksum = CRC(util.BytesToUint32(endMetaBuf[:NeedleChecksumSize]))
+ if version == Version3 {
+ n.AppendAtNs = util.BytesToUint64(endMetaBuf[NeedleChecksumSize : NeedleChecksumSize+TimestampSize])
+ }
+ return nil
+ }
+ // For v1, just skip Data
+ return nil
+ }
+ // Otherwise, read the full on-disk entry size
+ readLen := int(GetActualSize(size, version))
+ bytes := make([]byte, readLen)
+ count, err := r.ReadAt(bytes, offset)
+ if err == io.EOF && count == readLen {
+ err = nil
+ }
+ if count != readLen || err != nil {
return err
}
return n.ReadBytes(bytes, offset, size, version)
diff --git a/weed/storage/needle/needle_read_options_test.go b/weed/storage/needle/needle_read_options_test.go
index e50b77b59..f86bab57a 100644
--- a/weed/storage/needle/needle_read_options_test.go
+++ b/weed/storage/needle/needle_read_options_test.go
@@ -6,8 +6,6 @@ import (
"reflect"
"testing"
"time"
-
- . "github.com/seaweedfs/seaweedfs/weed/storage/types"
)
type mockBackend struct {
@@ -62,16 +60,20 @@ func TestReadFromFile_EquivalenceWithReadData(t *testing.T) {
Ttl: nil,
Pairs: []byte("key=value"),
PairsSize: 9,
- Checksum: 0xCAFEBABE,
+ Checksum: 0,
AppendAtNs: 0xDEADBEEF,
}
+ // remove the TTL bit in the flags
+ n.Flags = n.Flags &^ FlagHasTtl
+ // n.Checksum = NewCRC(n.Data)
+
buf := &bytes.Buffer{}
_, _, err := writeNeedleV2(n, 0, buf)
if err != nil {
t.Fatalf("writeNeedleV2 failed: %v", err)
}
backend := &mockBackend{data: buf.Bytes()}
- size := Size(len(buf.Bytes()) - NeedleHeaderSize - NeedleChecksumSize - int(PaddingLength(Size(len(buf.Bytes())-NeedleHeaderSize-NeedleChecksumSize), Version2)))
+ size := n.Size
// Old method
nOld := &Needle{}
@@ -90,3 +92,80 @@ func TestReadFromFile_EquivalenceWithReadData(t *testing.T) {
t.Errorf("needle mismatch: old=%+v new=%+v", nOld, nNew)
}
}
+
+func TestReadFromFile_OptionsMatrix(t *testing.T) {
+ n := &Needle{
+ Cookie: 0x12345678,
+ Id: 0x1122334455667788,
+ Data: []byte("hello world"),
+ Flags: 0xFF,
+ Name: []byte("filename.txt"),
+ Mime: []byte("text/plain"),
+ LastModified: 0x1234567890,
+ Ttl: nil,
+ Pairs: []byte("key=value"),
+ PairsSize: 9,
+ Checksum: 0,
+ AppendAtNs: 0xDEADBEEF,
+ }
+ n.Flags = n.Flags &^ FlagHasTtl
+ n.Checksum = NewCRC(n.Data)
+
+ buf := &bytes.Buffer{}
+ _, _, err := writeNeedleV2(n, 0, buf)
+ if err != nil {
+ t.Fatalf("writeNeedleV2 failed: %v", err)
+ }
+ backend := &mockBackend{data: buf.Bytes()}
+ size := n.Size
+
+ t.Run("ReadHeader only", func(t *testing.T) {
+ nHeader := &Needle{}
+ opts := NeedleReadOptions{ReadHeader: true}
+ err := nHeader.ReadFromFile(backend, 0, size, Version2, opts)
+ if err != nil {
+ t.Fatalf("ReadFromFile header only failed: %v", err)
+ }
+ if nHeader.Cookie != n.Cookie || nHeader.Id != n.Id || nHeader.Size != n.Size {
+ t.Errorf("header fields mismatch: got %+v want %+v", nHeader, n)
+ }
+ if nHeader.Data != nil || nHeader.Name != nil || nHeader.Mime != nil || nHeader.Pairs != nil || nHeader.Checksum != 0 {
+ t.Errorf("non-header fields should be zero, got %+v", nHeader)
+ }
+ })
+
+ t.Run("ReadHeader+ReadMeta", func(t *testing.T) {
+ nMeta := &Needle{}
+ opts := NeedleReadOptions{ReadHeader: true, ReadMeta: true}
+ err := nMeta.ReadFromFile(backend, 0, size, Version2, opts)
+ if err != nil {
+ t.Fatalf("ReadFromFile header+meta failed: %v", err)
+ }
+ if nMeta.Data != nil {
+ t.Errorf("Data should not be set when only meta is read")
+ }
+ if nMeta.Name == nil || nMeta.Mime == nil || nMeta.Pairs == nil {
+ t.Errorf("meta fields should be set, got %+v", nMeta)
+ }
+ if nMeta.Cookie != n.Cookie || nMeta.Id != n.Id || nMeta.Size != n.Size {
+ t.Errorf("header fields mismatch: got %+v want %+v", nMeta, n)
+ }
+ })
+
+ t.Run("ReadHeader+ReadData", func(t *testing.T) {
+ // this is the same as ReadHeader+ReadData+ReadMeta
+ })
+
+ t.Run("ReadHeader+ReadData+ReadMeta", func(t *testing.T) {
+ nFull := &Needle{}
+ opts := NeedleReadOptions{ReadHeader: true, ReadData: true, ReadMeta: true}
+ err := nFull.ReadFromFile(backend, 0, size, Version2, opts)
+ if err != nil {
+ t.Fatalf("ReadFromFile header+data+meta failed: %v", err)
+ }
+ nFull.AppendAtNs = n.AppendAtNs
+ if !reflect.DeepEqual(nFull, n) {
+ t.Errorf("needle mismatch: got %+v want %+v", nFull, n)
+ }
+ })
+}