aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/erasure_coding
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/erasure_coding')
-rw-r--r--weed/storage/erasure_coding/ec_locate.go8
-rw-r--r--weed/storage/erasure_coding/ec_test.go26
-rw-r--r--weed/storage/erasure_coding/ec_volume.go13
-rw-r--r--weed/storage/erasure_coding/ec_volume_test.go2
4 files changed, 36 insertions, 13 deletions
diff --git a/weed/storage/erasure_coding/ec_locate.go b/weed/storage/erasure_coding/ec_locate.go
index 4b092695c..72f8739ad 100644
--- a/weed/storage/erasure_coding/ec_locate.go
+++ b/weed/storage/erasure_coding/ec_locate.go
@@ -12,8 +12,8 @@ type Interval struct {
LargeBlockRowsCount int
}
-func LocateData(largeBlockLength, smallBlockLength int64, datSize int64, offset int64, size types.Size) (intervals []Interval) {
- blockIndex, isLargeBlock, nLargeBlockRows, innerBlockOffset := locateOffset(largeBlockLength, smallBlockLength, datSize, offset)
+func LocateData(largeBlockLength, smallBlockLength int64, shardDatSize int64, offset int64, size types.Size) (intervals []Interval) {
+ blockIndex, isLargeBlock, nLargeBlockRows, innerBlockOffset := locateOffset(largeBlockLength, smallBlockLength, shardDatSize, offset)
for size > 0 {
interval := Interval{
@@ -48,9 +48,9 @@ func LocateData(largeBlockLength, smallBlockLength int64, datSize int64, offset
return
}
-func locateOffset(largeBlockLength, smallBlockLength int64, datSize int64, offset int64) (blockIndex int, isLargeBlock bool, nLargeBlockRows int64, innerBlockOffset int64) {
+func locateOffset(largeBlockLength, smallBlockLength int64, shardDatSize int64, offset int64) (blockIndex int, isLargeBlock bool, nLargeBlockRows int64, innerBlockOffset int64) {
largeRowSize := largeBlockLength * DataShardsCount
- nLargeBlockRows = datSize / (largeBlockLength * DataShardsCount)
+ nLargeBlockRows = (shardDatSize-1)/ largeBlockLength
// if offset is within the large block area
if offset < nLargeBlockRows*largeRowSize {
diff --git a/weed/storage/erasure_coding/ec_test.go b/weed/storage/erasure_coding/ec_test.go
index fc0adbe9f..b1cc9c441 100644
--- a/weed/storage/erasure_coding/ec_test.go
+++ b/weed/storage/erasure_coding/ec_test.go
@@ -82,7 +82,9 @@ func assertSame(datFile *os.File, datSize int64, ecFiles []*os.File, offset type
return fmt.Errorf("failed to read dat file: %v", err)
}
- ecData, err := readEcFile(datSize, ecFiles, offset, size)
+ ecFileStat, _ := ecFiles[0].Stat()
+
+ ecData, err := readEcFile(ecFileStat.Size(), ecFiles, offset, size)
if err != nil {
return fmt.Errorf("failed to read ec file: %v", err)
}
@@ -107,9 +109,9 @@ func readDatFile(datFile *os.File, offset types.Offset, size types.Size) ([]byte
return data, nil
}
-func readEcFile(datSize int64, ecFiles []*os.File, offset types.Offset, size types.Size) (data []byte, err error) {
+func readEcFile(shardDatSize int64, ecFiles []*os.File, offset types.Offset, size types.Size) (data []byte, err error) {
- intervals := LocateData(largeBlockSize, smallBlockSize, datSize, offset.ToActualOffset(), size)
+ intervals := LocateData(largeBlockSize, smallBlockSize, shardDatSize, offset.ToActualOffset(), size)
for i, interval := range intervals {
if d, e := readOneInterval(interval, ecFiles); e != nil {
@@ -132,7 +134,7 @@ func readOneInterval(interval Interval, ecFiles []*os.File) (data []byte, err er
data = make([]byte, interval.Size)
err = readFromFile(ecFiles[ecFileIndex], data, ecFileOffset)
- { // do some ec testing
+ if false { // do some ec testing
ecData, err := readFromOtherEcFiles(ecFiles, int(ecFileIndex), ecFileOffset, interval.Size)
if err != nil {
return nil, fmt.Errorf("ec reconstruct error: %v", err)
@@ -191,7 +193,7 @@ func removeGeneratedFiles(baseFileName string) {
}
func TestLocateData(t *testing.T) {
- intervals := LocateData(largeBlockSize, smallBlockSize, DataShardsCount*largeBlockSize+1, DataShardsCount*largeBlockSize, 1)
+ intervals := LocateData(largeBlockSize, smallBlockSize, largeBlockSize+1, DataShardsCount*largeBlockSize, 1)
if len(intervals) != 1 {
t.Errorf("unexpected interval size %d", len(intervals))
}
@@ -199,7 +201,7 @@ func TestLocateData(t *testing.T) {
t.Errorf("unexpected interval %+v", intervals[0])
}
- intervals = LocateData(largeBlockSize, smallBlockSize, DataShardsCount*largeBlockSize+1, DataShardsCount*largeBlockSize/2+100, DataShardsCount*largeBlockSize+1-DataShardsCount*largeBlockSize/2-100)
+ intervals = LocateData(largeBlockSize, smallBlockSize, largeBlockSize+1, DataShardsCount*largeBlockSize/2+100, DataShardsCount*largeBlockSize+1-DataShardsCount*largeBlockSize/2-100)
fmt.Printf("%+v\n", intervals)
}
@@ -211,7 +213,7 @@ func (this Interval) sameAs(that Interval) bool {
}
func TestLocateData2(t *testing.T) {
- intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, 32205678320, 21479557912, 4194339)
+ intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, 3221225472, 21479557912, 4194339)
assert.Equal(t, intervals, []Interval{
{BlockIndex: 4, InnerBlockOffset: 527128, Size: 521448, IsLargeBlock: false, LargeBlockRowsCount: 2},
{BlockIndex: 5, InnerBlockOffset: 0, Size: 1048576, IsLargeBlock: false, LargeBlockRowsCount: 2},
@@ -220,3 +222,13 @@ func TestLocateData2(t *testing.T) {
{BlockIndex: 8, InnerBlockOffset: 0, Size: 527163, IsLargeBlock: false, LargeBlockRowsCount: 2},
})
}
+
+func TestLocateData3(t *testing.T) {
+ intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, 3221225472, 30782909808, 112568)
+ for _, interval := range intervals {
+ fmt.Printf("%+v\n", interval)
+ }
+ assert.Equal(t, intervals, []Interval{
+ {BlockIndex: 8876, InnerBlockOffset: 912752, Size: 112568, IsLargeBlock: false, LargeBlockRowsCount: 2},
+ })
+}
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go
index 81284d19f..c1b07ae17 100644
--- a/weed/storage/erasure_coding/ec_volume.go
+++ b/weed/storage/erasure_coding/ec_volume.go
@@ -39,6 +39,7 @@ type EcVolume struct {
ecjFile *os.File
ecjFileAccessLock sync.Mutex
diskType types.DiskType
+ datFileSize int64
}
func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
@@ -68,6 +69,7 @@ func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection
ev.Version = needle.Version3
if volumeInfo, _, found, _ := volume_info.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found {
ev.Version = needle.Version(volumeInfo.Version)
+ ev.datFileSize = volumeInfo.FileSize
} else {
volume_info.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
}
@@ -222,8 +224,17 @@ func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.
func (ev *EcVolume) LocateEcShardNeedleInterval(version needle.Version, offset int64, size types.Size) (intervals []Interval) {
shard := ev.Shards[0]
+ // Usually shard will be padded to round of ErasureCodingSmallBlockSize.
+ // So in most cases, if shardSize equals to n * ErasureCodingLargeBlockSize,
+ // the data would be in small blocks.
+ shardSize := shard.ecdFileSize - 1
+ if ev.datFileSize > 0 {
+ // To get the correct LargeBlockRowsCount
+ // use datFileSize to calculate the shardSize to match the EC encoding logic.
+ shardSize = ev.datFileSize / DataShardsCount
+ }
// calculate the locations in the ec shards
- intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset, types.Size(needle.GetActualSize(size, version)))
+ intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, shardSize, offset, types.Size(needle.GetActualSize(size, version)))
return
}
diff --git a/weed/storage/erasure_coding/ec_volume_test.go b/weed/storage/erasure_coding/ec_volume_test.go
index 4ee170e02..47faf0b62 100644
--- a/weed/storage/erasure_coding/ec_volume_test.go
+++ b/weed/storage/erasure_coding/ec_volume_test.go
@@ -44,7 +44,7 @@ func TestPositioning(t *testing.T) {
fmt.Printf("offset: %d size: %d\n", offset.ToActualOffset(), size)
var shardEcdFileSize int64 = 1118830592 // 1024*1024*1024*3
- intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shardEcdFileSize, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, needle.CurrentVersion)))
+ intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, shardEcdFileSize, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, needle.CurrentVersion)))
for _, interval := range intervals {
shardId, shardOffset := interval.ToShardIdAndOffset(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize)