diff options
Diffstat (limited to 'weed/storage')
| -rw-r--r-- | weed/storage/erasure_coding/ec_volume.go | 8 | ||||
| -rw-r--r-- | weed/storage/needle/needle_read.go | 4 | ||||
| -rw-r--r-- | weed/storage/needle/needle_read_page.go | 2 | ||||
| -rw-r--r-- | weed/storage/store.go | 8 | ||||
| -rw-r--r-- | weed/storage/volume_read.go | 18 | ||||
| -rw-r--r-- | weed/storage/volume_read_test.go | 91 |
6 files changed, 125 insertions, 6 deletions
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index 5e1d1121b..aa1e15722 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -210,10 +210,14 @@ func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle. return types.Offset{}, 0, nil, fmt.Errorf("FindNeedleFromEcx: %v", err) } - shard := ev.Shards[0] + intervals = ev.LocateEcShardNeedleInterval(version, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, version))) + return +} +func (ev *EcVolume) LocateEcShardNeedleInterval(version needle.Version, offset int64, size types.Size) (intervals []Interval) { + shard := ev.Shards[0] // calculate the locations in the ec shards - intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, version))) + intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset, types.Size(needle.GetActualSize(size, version))) return } diff --git a/weed/storage/needle/needle_read.go b/weed/storage/needle/needle_read.go index 5e347833c..55d0cc893 100644 --- a/weed/storage/needle/needle_read.go +++ b/weed/storage/needle/needle_read.go @@ -209,8 +209,8 @@ func NeedleBodyLength(needleSize Size, version Version) int64 { return int64(needleSize) + NeedleChecksumSize + int64(PaddingLength(needleSize, version)) } -//n should be a needle already read the header -//the input stream will read until next file entry +// n should be a needle already read the header +// the input stream will read until next file entry func (n *Needle) ReadNeedleBody(r backend.BackendStorageFile, version Version, offset int64, bodyLength int64) (bytes []byte, err error) { if bodyLength <= 0 { diff --git a/weed/storage/needle/needle_read_page.go b/weed/storage/needle/needle_read_page.go index 05b2fa9b0..b06622607 100644 --- a/weed/storage/needle/needle_read_page.go +++ b/weed/storage/needle/needle_read_page.go @@ -74,7 +74,6 @@ func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size return ErrorSizeMismatch } } - n.DataSize = util.BytesToUint32(bytes[NeedleHeaderSize : NeedleHeaderSize+DataSizeSize]) startOffset := offset + NeedleHeaderSize + DataSizeSize + int64(n.DataSize) @@ -90,7 +89,6 @@ func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size if err != nil { return err } - var index int index, err = n.readNeedleDataVersion2NonData(metaSlice) diff --git a/weed/storage/store.go b/weed/storage/store.go index e12fb2c50..0727cba95 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -384,6 +384,14 @@ func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle, readOption } return 0, fmt.Errorf("volume %d not found", i) } + +func (s *Store) ReadVolumeNeedleMetaAt(i needle.VolumeId, n *needle.Needle, offset int64, size int32) error { + if v := s.findVolume(i); v != nil { + return v.readNeedleMetaAt(n, offset, size) + } + return fmt.Errorf("volume %d not found", i) +} + func (s *Store) ReadVolumeNeedleDataInto(i needle.VolumeId, n *needle.Needle, readOption *ReadOption, writer io.Writer, offset int64, size int64) error { if v := s.findVolume(i); v != nil { return v.readNeedleDataInto(n, readOption, writer, offset, size) diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go index d4d795fee..b51ab5c82 100644 --- a/weed/storage/volume_read.go +++ b/weed/storage/volume_read.go @@ -80,6 +80,24 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption, onReadSize return -1, ErrorNotFound } +// read needle at a specific offset +func (v *Volume) readNeedleMetaAt(n *needle.Needle, offset int64, size int32) (err error) { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + // read deleted meta data + if size < 0 { + size = -size + } + err = n.ReadNeedleMeta(v.DataBackend, offset, Size(size), v.Version()) + if err == needle.ErrorSizeMismatch && OffsetSize == 4 { + err = n.ReadNeedleMeta(v.DataBackend, offset+int64(MaxPossibleVolumeSize), Size(size), v.Version()) + } + if err != nil { + return err + } + return nil +} + // read fills in Needle content by looking up n.Id from NeedleMapper func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, writer io.Writer, offset int64, size int64) (err error) { v.dataFileAccessLock.RLock() diff --git a/weed/storage/volume_read_test.go b/weed/storage/volume_read_test.go new file mode 100644 index 000000000..f1d0aa2fc --- /dev/null +++ b/weed/storage/volume_read_test.go @@ -0,0 +1,91 @@ +package storage + +import ( + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestReadNeedMetaWithWritesAndUpdates(t *testing.T) { + dir := t.TempDir() + + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0) + if err != nil { + t.Fatalf("volume creation: %v", err) + } + type WriteInfo struct { + offset int64 + size int32 + } + writeInfos := make([]WriteInfo, 30) + mockLastUpdateTime := uint64(1000000000000) + // initialize 20 needles then update first 10 needles + for i := 1; i <= 30; i++ { + n := newRandomNeedle(uint64(i % 20)) + n.Flags = 0x08 + n.LastModified = mockLastUpdateTime + mockLastUpdateTime += 2000 + offset, _, _, err := v.writeNeedle2(n, true, false) + if err != nil { + t.Fatalf("write needle %d: %v", i, err) + } + writeInfos[i-1] = WriteInfo{offset: int64(offset), size: int32(n.Size)} + } + expectedLastUpdateTime := uint64(1000000000000) + for i := 0; i < 30; i++ { + testNeedle := new(needle.Needle) + testNeedle.Id = types.Uint64ToNeedleId(uint64(i + 1%20)) + testNeedle.Flags = 0x08 + v.readNeedleMetaAt(testNeedle, writeInfos[i].offset, writeInfos[i].size) + actualLastModifiedTime := testNeedle.LastModified + assert.Equal(t, expectedLastUpdateTime, actualLastModifiedTime, "The two words should be the same.") + expectedLastUpdateTime += 2000 + } +} + +func TestReadNeedMetaWithDeletesThenWrites(t *testing.T) { + dir := t.TempDir() + + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0) + if err != nil { + t.Fatalf("volume creation: %v", err) + } + type WriteInfo struct { + offset int64 + size int32 + } + writeInfos := make([]WriteInfo, 10) + mockLastUpdateTime := uint64(1000000000000) + for i := 1; i <= 10; i++ { + n := newRandomNeedle(uint64(i % 5)) + n.Flags = 0x08 + n.LastModified = mockLastUpdateTime + mockLastUpdateTime += 2000 + offset, _, _, err := v.writeNeedle2(n, true, false) + if err != nil { + t.Fatalf("write needle %d: %v", i, err) + } + if i < 5 { + size, err := v.deleteNeedle2(n) + if err != nil { + t.Fatalf("delete needle %d: %v", i, err) + } + writeInfos[i-1] = WriteInfo{offset: int64(offset), size: int32(size)} + } else { + writeInfos[i-1] = WriteInfo{offset: int64(offset), size: int32(n.Size)} + } + } + + expectedLastUpdateTime := uint64(1000000000000) + for i := 0; i < 10; i++ { + testNeedle := new(needle.Needle) + testNeedle.Id = types.Uint64ToNeedleId(uint64(i + 1%5)) + testNeedle.Flags = 0x08 + v.readNeedleMetaAt(testNeedle, writeInfos[i].offset, writeInfos[i].size) + actualLastModifiedTime := testNeedle.LastModified + assert.Equal(t, expectedLastUpdateTime, actualLastModifiedTime, "The two words should be the same.") + expectedLastUpdateTime += 2000 + } +} |
