aboutsummaryrefslogtreecommitdiff
path: root/weed/storage
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage')
-rw-r--r--weed/storage/erasure_coding/ec_volume.go8
-rw-r--r--weed/storage/needle/needle_read.go4
-rw-r--r--weed/storage/needle/needle_read_page.go2
-rw-r--r--weed/storage/store.go8
-rw-r--r--weed/storage/volume_read.go18
-rw-r--r--weed/storage/volume_read_test.go91
6 files changed, 125 insertions, 6 deletions
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go
index 5e1d1121b..aa1e15722 100644
--- a/weed/storage/erasure_coding/ec_volume.go
+++ b/weed/storage/erasure_coding/ec_volume.go
@@ -210,10 +210,14 @@ func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.
return types.Offset{}, 0, nil, fmt.Errorf("FindNeedleFromEcx: %v", err)
}
- shard := ev.Shards[0]
+ intervals = ev.LocateEcShardNeedleInterval(version, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, version)))
+ return
+}
+func (ev *EcVolume) LocateEcShardNeedleInterval(version needle.Version, offset int64, size types.Size) (intervals []Interval) {
+ shard := ev.Shards[0]
// calculate the locations in the ec shards
- intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, version)))
+ intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset, types.Size(needle.GetActualSize(size, version)))
return
}
diff --git a/weed/storage/needle/needle_read.go b/weed/storage/needle/needle_read.go
index 5e347833c..55d0cc893 100644
--- a/weed/storage/needle/needle_read.go
+++ b/weed/storage/needle/needle_read.go
@@ -209,8 +209,8 @@ func NeedleBodyLength(needleSize Size, version Version) int64 {
return int64(needleSize) + NeedleChecksumSize + int64(PaddingLength(needleSize, version))
}
-//n should be a needle already read the header
-//the input stream will read until next file entry
+// n should be a needle already read the header
+// the input stream will read until next file entry
func (n *Needle) ReadNeedleBody(r backend.BackendStorageFile, version Version, offset int64, bodyLength int64) (bytes []byte, err error) {
if bodyLength <= 0 {
diff --git a/weed/storage/needle/needle_read_page.go b/weed/storage/needle/needle_read_page.go
index 05b2fa9b0..b06622607 100644
--- a/weed/storage/needle/needle_read_page.go
+++ b/weed/storage/needle/needle_read_page.go
@@ -74,7 +74,6 @@ func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size
return ErrorSizeMismatch
}
}
-
n.DataSize = util.BytesToUint32(bytes[NeedleHeaderSize : NeedleHeaderSize+DataSizeSize])
startOffset := offset + NeedleHeaderSize + DataSizeSize + int64(n.DataSize)
@@ -90,7 +89,6 @@ func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size
if err != nil {
return err
}
-
var index int
index, err = n.readNeedleDataVersion2NonData(metaSlice)
diff --git a/weed/storage/store.go b/weed/storage/store.go
index e12fb2c50..0727cba95 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -384,6 +384,14 @@ func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle, readOption
}
return 0, fmt.Errorf("volume %d not found", i)
}
+
+func (s *Store) ReadVolumeNeedleMetaAt(i needle.VolumeId, n *needle.Needle, offset int64, size int32) error {
+ if v := s.findVolume(i); v != nil {
+ return v.readNeedleMetaAt(n, offset, size)
+ }
+ return fmt.Errorf("volume %d not found", i)
+}
+
func (s *Store) ReadVolumeNeedleDataInto(i needle.VolumeId, n *needle.Needle, readOption *ReadOption, writer io.Writer, offset int64, size int64) error {
if v := s.findVolume(i); v != nil {
return v.readNeedleDataInto(n, readOption, writer, offset, size)
diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go
index d4d795fee..b51ab5c82 100644
--- a/weed/storage/volume_read.go
+++ b/weed/storage/volume_read.go
@@ -80,6 +80,24 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption, onReadSize
return -1, ErrorNotFound
}
+// read needle at a specific offset
+func (v *Volume) readNeedleMetaAt(n *needle.Needle, offset int64, size int32) (err error) {
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
+ // read deleted meta data
+ if size < 0 {
+ size = -size
+ }
+ err = n.ReadNeedleMeta(v.DataBackend, offset, Size(size), v.Version())
+ if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
+ err = n.ReadNeedleMeta(v.DataBackend, offset+int64(MaxPossibleVolumeSize), Size(size), v.Version())
+ }
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
// read fills in Needle content by looking up n.Id from NeedleMapper
func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, writer io.Writer, offset int64, size int64) (err error) {
v.dataFileAccessLock.RLock()
diff --git a/weed/storage/volume_read_test.go b/weed/storage/volume_read_test.go
new file mode 100644
index 000000000..f1d0aa2fc
--- /dev/null
+++ b/weed/storage/volume_read_test.go
@@ -0,0 +1,91 @@
+package storage
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/storage/needle"
+ "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
+ "github.com/seaweedfs/seaweedfs/weed/storage/types"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestReadNeedMetaWithWritesAndUpdates(t *testing.T) {
+ dir := t.TempDir()
+
+ v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0)
+ if err != nil {
+ t.Fatalf("volume creation: %v", err)
+ }
+ type WriteInfo struct {
+ offset int64
+ size int32
+ }
+ writeInfos := make([]WriteInfo, 30)
+ mockLastUpdateTime := uint64(1000000000000)
+ // initialize 20 needles then update first 10 needles
+ for i := 1; i <= 30; i++ {
+ n := newRandomNeedle(uint64(i % 20))
+ n.Flags = 0x08
+ n.LastModified = mockLastUpdateTime
+ mockLastUpdateTime += 2000
+ offset, _, _, err := v.writeNeedle2(n, true, false)
+ if err != nil {
+ t.Fatalf("write needle %d: %v", i, err)
+ }
+ writeInfos[i-1] = WriteInfo{offset: int64(offset), size: int32(n.Size)}
+ }
+ expectedLastUpdateTime := uint64(1000000000000)
+ for i := 0; i < 30; i++ {
+ testNeedle := new(needle.Needle)
+ testNeedle.Id = types.Uint64ToNeedleId(uint64(i + 1%20))
+ testNeedle.Flags = 0x08
+ v.readNeedleMetaAt(testNeedle, writeInfos[i].offset, writeInfos[i].size)
+ actualLastModifiedTime := testNeedle.LastModified
+ assert.Equal(t, expectedLastUpdateTime, actualLastModifiedTime, "The two words should be the same.")
+ expectedLastUpdateTime += 2000
+ }
+}
+
+func TestReadNeedMetaWithDeletesThenWrites(t *testing.T) {
+ dir := t.TempDir()
+
+ v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0)
+ if err != nil {
+ t.Fatalf("volume creation: %v", err)
+ }
+ type WriteInfo struct {
+ offset int64
+ size int32
+ }
+ writeInfos := make([]WriteInfo, 10)
+ mockLastUpdateTime := uint64(1000000000000)
+ for i := 1; i <= 10; i++ {
+ n := newRandomNeedle(uint64(i % 5))
+ n.Flags = 0x08
+ n.LastModified = mockLastUpdateTime
+ mockLastUpdateTime += 2000
+ offset, _, _, err := v.writeNeedle2(n, true, false)
+ if err != nil {
+ t.Fatalf("write needle %d: %v", i, err)
+ }
+ if i < 5 {
+ size, err := v.deleteNeedle2(n)
+ if err != nil {
+ t.Fatalf("delete needle %d: %v", i, err)
+ }
+ writeInfos[i-1] = WriteInfo{offset: int64(offset), size: int32(size)}
+ } else {
+ writeInfos[i-1] = WriteInfo{offset: int64(offset), size: int32(n.Size)}
+ }
+ }
+
+ expectedLastUpdateTime := uint64(1000000000000)
+ for i := 0; i < 10; i++ {
+ testNeedle := new(needle.Needle)
+ testNeedle.Id = types.Uint64ToNeedleId(uint64(i + 1%5))
+ testNeedle.Flags = 0x08
+ v.readNeedleMetaAt(testNeedle, writeInfos[i].offset, writeInfos[i].size)
+ actualLastModifiedTime := testNeedle.LastModified
+ assert.Equal(t, expectedLastUpdateTime, actualLastModifiedTime, "The two words should be the same.")
+ expectedLastUpdateTime += 2000
+ }
+}