aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/storage/needle/43.datbin0 -> 1153928 bytes
-rw-r--r--weed/storage/needle/needle_read.go21
-rw-r--r--weed/storage/needle/needle_read_page.go77
-rw-r--r--weed/storage/needle/needle_read_test.go99
-rw-r--r--weed/storage/types/needle_types.go1
5 files changed, 191 insertions, 7 deletions
diff --git a/weed/storage/needle/43.dat b/weed/storage/needle/43.dat
new file mode 100644
index 000000000..7ef8c28ca
--- /dev/null
+++ b/weed/storage/needle/43.dat
Binary files differ
diff --git a/weed/storage/needle/needle_read.go b/weed/storage/needle/needle_read.go
index 882ed0233..77c7e88dc 100644
--- a/weed/storage/needle/needle_read.go
+++ b/weed/storage/needle/needle_read.go
@@ -111,6 +111,13 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
}
n.Data = bytes[index : index+int(n.DataSize)]
index = index + int(n.DataSize)
+ }
+ _, err = n.readNeedleDataVersion2NonData(bytes[index:])
+ return
+}
+func (n *Needle) readNeedleDataVersion2NonData(bytes []byte) (index int, err error) {
+ lenBytes := len(bytes)
+ if index < lenBytes {
n.Flags = bytes[index]
index = index + 1
}
@@ -119,7 +126,7 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
index = index + 1
if int(n.NameSize)+index > lenBytes {
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
- return fmt.Errorf("index out of range %d", 2)
+ return index, fmt.Errorf("index out of range %d", 2)
}
n.Name = bytes[index : index+int(n.NameSize)]
index = index + int(n.NameSize)
@@ -129,7 +136,7 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
index = index + 1
if int(n.MimeSize)+index > lenBytes {
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
- return fmt.Errorf("index out of range %d", 3)
+ return index, fmt.Errorf("index out of range %d", 3)
}
n.Mime = bytes[index : index+int(n.MimeSize)]
index = index + int(n.MimeSize)
@@ -137,7 +144,7 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
if index < lenBytes && n.HasLastModifiedDate() {
if LastModifiedBytesLength+index > lenBytes {
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
- return fmt.Errorf("index out of range %d", 4)
+ return index, fmt.Errorf("index out of range %d", 4)
}
n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength])
index = index + LastModifiedBytesLength
@@ -145,7 +152,7 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
if index < lenBytes && n.HasTtl() {
if TtlBytesLength+index > lenBytes {
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
- return fmt.Errorf("index out of range %d", 5)
+ return index, fmt.Errorf("index out of range %d", 5)
}
n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength])
index = index + TtlBytesLength
@@ -153,19 +160,19 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
if index < lenBytes && n.HasPairs() {
if 2+index > lenBytes {
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
- return fmt.Errorf("index out of range %d", 6)
+ return index, fmt.Errorf("index out of range %d", 6)
}
n.PairsSize = util.BytesToUint16(bytes[index : index+2])
index += 2
if int(n.PairsSize)+index > lenBytes {
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
- return fmt.Errorf("index out of range %d", 7)
+ return index, fmt.Errorf("index out of range %d", 7)
}
end := index + int(n.PairsSize)
n.Pairs = bytes[index:end]
index = end
}
- return nil
+ return index, nil
}
func ReadNeedleHeader(r backend.BackendStorageFile, version Version, offset int64) (n *Needle, bytes []byte, bodyLength int64, err error) {
diff --git a/weed/storage/needle/needle_read_page.go b/weed/storage/needle/needle_read_page.go
new file mode 100644
index 000000000..4600f4314
--- /dev/null
+++ b/weed/storage/needle/needle_read_page.go
@@ -0,0 +1,77 @@
+package needle
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ . "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "io"
+)
+
+// ReadNeedleData uses a needle without n.Data to read the content
+// volumeOffset: the offset within the volume
+// needleOffset: the offset within the needle Data
+func (n *Needle) ReadNeedleData(r backend.BackendStorageFile, volumeOffset int64, data []byte, needleOffset int64) (count int, err error) {
+
+ sizeToRead := min(int64(len(data)), int64(n.DataSize)-needleOffset)
+ if sizeToRead <= 0 {
+ return 0, io.EOF
+ }
+ startOffset := volumeOffset + NeedleHeaderSize + DataSizeSize + needleOffset
+
+ count, err = r.ReadAt(data[:sizeToRead], startOffset)
+ if err != nil {
+ fileSize, _, _ := r.GetStat()
+ glog.Errorf("%s read %d %d size %d at offset %d fileSize %d: %v", r.Name(), n.Id, needleOffset, sizeToRead, volumeOffset, fileSize, err)
+ }
+ return
+
+}
+
+// ReadNeedleMeta fills all metadata except the n.Data
+func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size Size, version Version) (checksumValue uint32, err error) {
+
+ bytes := make([]byte, NeedleHeaderSize+DataSizeSize)
+
+ count, err := r.ReadAt(bytes, offset)
+ if count != NeedleHeaderSize+DataSizeSize || err != nil {
+ return 0, err
+ }
+ n.ParseNeedleHeader(bytes)
+ n.DataSize = util.BytesToUint32(bytes[NeedleHeaderSize : NeedleHeaderSize+DataSizeSize])
+
+ startOffset := offset + NeedleHeaderSize + DataSizeSize + int64(n.DataSize)
+ dataSize := GetActualSize(size, version)
+ stopOffset := offset + dataSize
+ metaSize := stopOffset - startOffset
+ fmt.Printf("offset %d dataSize %d\n", offset, dataSize)
+ fmt.Printf("read needle meta [%d,%d) size %d\n", startOffset, stopOffset, metaSize)
+ metaSlice := make([]byte, int(metaSize))
+
+ count, err = r.ReadAt(metaSlice, startOffset)
+ if err != nil && int64(count) == metaSize {
+ err = nil
+ }
+ if err != nil {
+ return 0, err
+ }
+
+ var index int
+ index, err = n.readNeedleDataVersion2NonData(metaSlice)
+
+ checksumValue = util.BytesToUint32(metaSlice[index : index+NeedleChecksumSize])
+ if version == Version3 {
+ n.AppendAtNs = util.BytesToUint64(metaSlice[index+NeedleChecksumSize : index+NeedleChecksumSize+TimestampSize])
+ }
+
+ return checksumValue, err
+
+}
+
+func min(x, y int64) int64 {
+ if x < y {
+ return x
+ }
+ return y
+}
diff --git a/weed/storage/needle/needle_read_test.go b/weed/storage/needle/needle_read_test.go
new file mode 100644
index 000000000..b519b0241
--- /dev/null
+++ b/weed/storage/needle/needle_read_test.go
@@ -0,0 +1,99 @@
+package needle
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/stretchr/testify/assert"
+ "io"
+ "os"
+ "testing"
+
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+func TestPageRead(t *testing.T) {
+ baseFileName := "43"
+ offset := int64(8)
+ size := types.Size(1153890) // actual file size 1153862
+
+ datFile, err := os.OpenFile(baseFileName+".dat", os.O_RDONLY, 0644)
+ if err != nil {
+ t.Fatalf("Open Volume Data File [ERROR]: %v", err)
+ }
+ datBackend := backend.NewDiskFile(datFile)
+ defer datBackend.Close()
+ {
+ n := new(Needle)
+
+ bytes, err := ReadNeedleBlob(datBackend, offset, size, Version3)
+ if err != nil {
+ t.Fatalf("readNeedleBlob: %v", err)
+ }
+ if err = n.ReadBytes(bytes, offset, size, Version3); err != nil {
+ t.Fatalf("readNeedleBlob: %v", err)
+ }
+
+ fmt.Printf("bytes len %d\n", len(bytes))
+ fmt.Printf("name %s size %d\n", n.Name, n.Size)
+
+ fmt.Printf("id %d\n", n.Id)
+ fmt.Printf("DataSize %d\n", n.DataSize)
+ fmt.Printf("Flags %v\n", n.Flags)
+ fmt.Printf("NameSize %d\n", n.NameSize)
+ fmt.Printf("MimeSize %d\n", n.MimeSize)
+ fmt.Printf("PairsSize %d\n", n.PairsSize)
+ fmt.Printf("LastModified %d\n", n.LastModified)
+ fmt.Printf("AppendAtNs %d\n", n.AppendAtNs)
+ fmt.Printf("Checksum %d\n", n.Checksum)
+ }
+
+ {
+ n, bytes, bodyLength, err := ReadNeedleHeader(datBackend, Version3, offset)
+ if err != nil {
+ t.Fatalf("ReadNeedleHeader: %v", err)
+ }
+ fmt.Printf("bytes len %d\n", len(bytes))
+ fmt.Printf("name %s size %d bodyLength:%d\n", n.Name, n.Size, bodyLength)
+ }
+
+ {
+ n := new(Needle)
+ checksumValue, err := n.ReadNeedleMeta(datBackend, offset, size, Version3)
+ if err != nil {
+ t.Fatalf("ReadNeedleHeader: %v", err)
+ }
+ fmt.Printf("name %s size %d\n", n.Name, n.Size)
+ fmt.Printf("id %d\n", n.Id)
+ fmt.Printf("DataSize %d\n", n.DataSize)
+ fmt.Printf("Flags %v\n", n.Flags)
+ fmt.Printf("NameSize %d\n", n.NameSize)
+ fmt.Printf("MimeSize %d\n", n.MimeSize)
+ fmt.Printf("PairsSize %d\n", n.PairsSize)
+ fmt.Printf("LastModified %d\n", n.LastModified)
+ fmt.Printf("AppendAtNs %d\n", n.AppendAtNs)
+ fmt.Printf("Checksum %d\n", n.Checksum)
+ fmt.Printf("Checksum value %d\n", checksumValue)
+
+ buf := make([]byte, 1024)
+ crc := CRC(0)
+ for x := int64(0); ; x += 1024 {
+ count, err := n.ReadNeedleData(datBackend, offset, buf, x)
+ if err != nil {
+ if err == io.EOF {
+ break
+ }
+ t.Fatalf("ReadNeedleData: %v", err)
+ }
+ if count > 0 {
+ crc = crc.Update(buf[0:count])
+ } else {
+ break
+ }
+ }
+ fmt.Printf("read checksum value %d\n", crc.Value())
+
+ assert.Equal(t, checksumValue, crc.Value(), "validate checksum value")
+
+ }
+
+}
diff --git a/weed/storage/types/needle_types.go b/weed/storage/types/needle_types.go
index 137b97d7f..98287fe6a 100644
--- a/weed/storage/types/needle_types.go
+++ b/weed/storage/types/needle_types.go
@@ -33,6 +33,7 @@ type Cookie uint32
const (
SizeSize = 4 // uint32 size
NeedleHeaderSize = CookieSize + NeedleIdSize + SizeSize
+ DataSizeSize = 4
NeedleMapEntrySize = NeedleIdSize + OffsetSize + SizeSize
TimestampSize = 8 // int64 size
NeedlePaddingSize = 8