aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/erasure_coding
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-05-27 01:29:46 -0700
committerChris Lu <chris.lu@gmail.com>2019-05-27 01:29:46 -0700
commita4f3d82c57bca13321dca257891836ff36c7eca5 (patch)
treef65ffa52266d7ecc0d867c7bcdc404202e59a0f4 /weed/storage/erasure_coding
parenta463759edfbfafa04b3ae85ccb59ee27a6045ad4 (diff)
downloadseaweedfs-a4f3d82c57bca13321dca257891836ff36c7eca5.tar.xz
seaweedfs-a4f3d82c57bca13321dca257891836ff36c7eca5.zip
convert needle id to ec intervals to read from
Diffstat (limited to 'weed/storage/erasure_coding')
-rw-r--r--weed/storage/erasure_coding/ec_locate.go27
-rw-r--r--weed/storage/erasure_coding/ec_shard.go107
-rw-r--r--weed/storage/erasure_coding/ec_test.go26
-rw-r--r--weed/storage/erasure_coding/ec_volume.go74
4 files changed, 156 insertions, 78 deletions
diff --git a/weed/storage/erasure_coding/ec_locate.go b/weed/storage/erasure_coding/ec_locate.go
index b570f750c..ee8af3382 100644
--- a/weed/storage/erasure_coding/ec_locate.go
+++ b/weed/storage/erasure_coding/ec_locate.go
@@ -1,22 +1,25 @@
package erasure_coding
type Interval struct {
- blockIndex int
- innerBlockOffset int64
- size uint32
- isLargeBlock bool
+ BlockIndex int
+ InnerBlockOffset int64
+ Size uint32
+ IsLargeBlock bool
+ LargeBlockRowsCount int
}
-func locateData(largeBlockLength, smallBlockLength int64, datSize int64, offset int64, size uint32) (intervals []Interval) {
+func LocateData(largeBlockLength, smallBlockLength int64, datSize int64, offset int64, size uint32) (intervals []Interval) {
blockIndex, isLargeBlock, innerBlockOffset := locateOffset(largeBlockLength, smallBlockLength, datSize, offset)
- nLargeBlockRows := int(datSize / (largeBlockLength * DataShardsCount))
+ // adding DataShardsCount*smallBlockLength to ensure we can derive the number of large block size from a shard size
+ nLargeBlockRows := int((datSize + DataShardsCount*smallBlockLength) / (largeBlockLength * DataShardsCount))
for size > 0 {
interval := Interval{
- blockIndex: blockIndex,
- innerBlockOffset: innerBlockOffset,
- isLargeBlock: isLargeBlock,
+ BlockIndex: blockIndex,
+ InnerBlockOffset: innerBlockOffset,
+ IsLargeBlock: isLargeBlock,
+ LargeBlockRowsCount: nLargeBlockRows,
}
blockRemaining := largeBlockLength - innerBlockOffset
@@ -25,14 +28,14 @@ func locateData(largeBlockLength, smallBlockLength int64, datSize int64, offset
}
if int64(size) <= blockRemaining {
- interval.size = size
+ interval.Size = size
intervals = append(intervals, interval)
return
}
- interval.size = uint32(blockRemaining)
+ interval.Size = uint32(blockRemaining)
intervals = append(intervals, interval)
- size -= interval.size
+ size -= interval.Size
blockIndex += 1
if isLargeBlock && blockIndex == nLargeBlockRows*DataShardsCount {
isLargeBlock = false
diff --git a/weed/storage/erasure_coding/ec_shard.go b/weed/storage/erasure_coding/ec_shard.go
new file mode 100644
index 000000000..084d3f03b
--- /dev/null
+++ b/weed/storage/erasure_coding/ec_shard.go
@@ -0,0 +1,107 @@
+package erasure_coding
+
+import (
+ "fmt"
+ "os"
+ "path"
+ "strconv"
+
+ "github.com/chrislusf/seaweedfs/weed/storage/idx"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+type ShardId uint8
+
+type EcVolumeShard struct {
+ VolumeId needle.VolumeId
+ ShardId ShardId
+ Collection string
+ dir string
+ ecdFile *os.File
+ ecdFileSize int64
+ ecxFile *os.File
+ ecxFileSize int64
+}
+
+func NewEcVolumeShard(dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) {
+
+ v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId}
+
+ baseFileName := v.FileName()
+
+ // open ecx file
+ if v.ecxFile, e = os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644); e != nil {
+ return nil, fmt.Errorf("cannot read ec volume index %s.ecx: %v", baseFileName, e)
+ }
+ ecxFi, statErr := v.ecxFile.Stat()
+ if statErr != nil {
+ return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", baseFileName, statErr)
+ }
+ v.ecxFileSize = ecxFi.Size()
+
+ // open ecd file
+ if v.ecdFile, e = os.OpenFile(baseFileName+ToExt(int(shardId)), os.O_RDONLY, 0644); e != nil {
+ return nil, fmt.Errorf("cannot read ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), e)
+ }
+ ecdFi, statErr := v.ecdFile.Stat()
+ if statErr != nil {
+ return nil, fmt.Errorf("can not stat ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), statErr)
+ }
+ v.ecdFileSize = ecdFi.Size()
+
+ return
+}
+
+func (shard *EcVolumeShard) String() string {
+ return fmt.Sprintf("ec shard %v:%v, dir:%s, Collection:%s", shard.VolumeId, shard.ShardId, shard.dir, shard.Collection)
+}
+
+func (shard *EcVolumeShard) FileName() (fileName string) {
+ return EcShardFileName(shard.Collection, shard.dir, int(shard.VolumeId))
+}
+
+func EcShardFileName(collection string, dir string, id int) (fileName string) {
+ idString := strconv.Itoa(id)
+ if collection == "" {
+ fileName = path.Join(dir, idString)
+ } else {
+ fileName = path.Join(dir, collection+"_"+idString)
+ }
+ return
+}
+
+func (shard *EcVolumeShard) Close() {
+ if shard.ecdFile != nil {
+ _ = shard.ecdFile.Close()
+ shard.ecdFile = nil
+ }
+ if shard.ecxFile != nil {
+ _ = shard.ecxFile.Close()
+ shard.ecxFile = nil
+ }
+}
+
+func (shard *EcVolumeShard) findNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) {
+ var key types.NeedleId
+ buf := make([]byte, types.NeedleMapEntrySize)
+ l, h := int64(0), shard.ecxFileSize/types.NeedleMapEntrySize
+ for l < h {
+ m := (l + h) / 2
+ if _, err := shard.ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
+ return types.Offset{}, 0, err
+ }
+ key, offset, size = idx.IdxFileEntry(buf)
+ if key == needleId {
+ return
+ }
+ if key < needleId {
+ l = m + 1
+ } else {
+ h = m
+ }
+ }
+
+ err = fmt.Errorf("needle id %d not found", needleId)
+ return
+}
diff --git a/weed/storage/erasure_coding/ec_test.go b/weed/storage/erasure_coding/ec_test.go
index ecf73ac96..83b0bc23a 100644
--- a/weed/storage/erasure_coding/ec_test.go
+++ b/weed/storage/erasure_coding/ec_test.go
@@ -103,7 +103,7 @@ func readDatFile(datFile *os.File, offset types.Offset, size uint32) ([]byte, er
func readEcFile(datSize int64, ecFiles []*os.File, offset types.Offset, size uint32) (data []byte, err error) {
- intervals := locateData(largeBlockSize, smallBlockSize, datSize, offset.ToAcutalOffset(), size)
+ intervals := LocateData(largeBlockSize, smallBlockSize, datSize, offset.ToAcutalOffset(), size)
nLargeBlockRows := int(datSize / (largeBlockSize * DataShardsCount))
@@ -123,20 +123,20 @@ func readEcFile(datSize int64, ecFiles []*os.File, offset types.Offset, size uin
}
func readOneInterval(interval Interval, ecFiles []*os.File, nLargeBlockRows int) (data []byte, err error) {
- ecFileOffset := interval.innerBlockOffset
- rowIndex := interval.blockIndex / DataShardsCount
- if interval.isLargeBlock {
+ ecFileOffset := interval.InnerBlockOffset
+ rowIndex := interval.BlockIndex / DataShardsCount
+ if interval.IsLargeBlock {
ecFileOffset += int64(rowIndex) * largeBlockSize
} else {
ecFileOffset += int64(nLargeBlockRows)*largeBlockSize + int64(rowIndex)*smallBlockSize
}
- ecFileIndex := interval.blockIndex % DataShardsCount
+ ecFileIndex := interval.BlockIndex % DataShardsCount
- data = make([]byte, interval.size)
+ data = make([]byte, interval.Size)
err = readFromFile(ecFiles[ecFileIndex], data, ecFileOffset)
{ // do some ec testing
- ecData, err := readFromOtherEcFiles(ecFiles, ecFileIndex, ecFileOffset, interval.size)
+ ecData, err := readFromOtherEcFiles(ecFiles, ecFileIndex, ecFileOffset, interval.Size)
if err != nil {
return nil, fmt.Errorf("ec reconstruct error: %v", err)
}
@@ -194,7 +194,7 @@ func removeGeneratedFiles(baseFileName string) {
}
func TestLocateData(t *testing.T) {
- intervals := locateData(largeBlockSize, smallBlockSize, DataShardsCount*largeBlockSize+1, DataShardsCount*largeBlockSize, 1)
+ intervals := LocateData(largeBlockSize, smallBlockSize, DataShardsCount*largeBlockSize+1, DataShardsCount*largeBlockSize, 1)
if len(intervals) != 1 {
t.Errorf("unexpected interval size %d", len(intervals))
}
@@ -202,13 +202,13 @@ func TestLocateData(t *testing.T) {
t.Errorf("unexpected interval %+v", intervals[0])
}
- intervals = locateData(largeBlockSize, smallBlockSize, DataShardsCount*largeBlockSize+1, DataShardsCount*largeBlockSize/2+100, DataShardsCount*largeBlockSize+1-DataShardsCount*largeBlockSize/2-100)
+ intervals = LocateData(largeBlockSize, smallBlockSize, DataShardsCount*largeBlockSize+1, DataShardsCount*largeBlockSize/2+100, DataShardsCount*largeBlockSize+1-DataShardsCount*largeBlockSize/2-100)
fmt.Printf("%+v\n", intervals)
}
func (this Interval) sameAs(that Interval) bool {
- return this.isLargeBlock == that.isLargeBlock &&
- this.innerBlockOffset == that.innerBlockOffset &&
- this.blockIndex == that.blockIndex &&
- this.size == that.size
+ return this.IsLargeBlock == that.IsLargeBlock &&
+ this.InnerBlockOffset == that.InnerBlockOffset &&
+ this.BlockIndex == that.BlockIndex &&
+ this.Size == that.Size
}
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go
index 11dcd0860..d57a28449 100644
--- a/weed/storage/erasure_coding/ec_volume.go
+++ b/weed/storage/erasure_coding/ec_volume.go
@@ -1,44 +1,15 @@
package erasure_coding
import (
- "fmt"
"math"
- "os"
- "path"
"sort"
- "strconv"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
)
-type ShardId uint8
-
-type EcVolumeShard struct {
- VolumeId needle.VolumeId
- ShardId ShardId
- Collection string
- dir string
- ecdFile *os.File
- ecxFile *os.File
-}
type EcVolumeShards []*EcVolumeShard
-func NewEcVolumeShard(dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) {
-
- v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId}
-
- baseFileName := v.FileName()
- if v.ecxFile, e = os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644); e != nil {
- return nil, fmt.Errorf("cannot read ec volume index %s.ecx: %v", baseFileName, e)
- }
- if v.ecdFile, e = os.OpenFile(baseFileName+ToExt(int(shardId)), os.O_RDONLY, 0644); e != nil {
- return nil, fmt.Errorf("cannot read ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), e)
- }
-
- return
-}
-
func (shards *EcVolumeShards) AddEcVolumeShard(ecVolumeShard *EcVolumeShard) bool {
for _, s := range *shards {
if s.ShardId == ecVolumeShard.ShardId {
@@ -68,6 +39,15 @@ func (shards *EcVolumeShards) DeleteEcVolumeShard(ecVolumeShard *EcVolumeShard)
return true
}
+func (shards *EcVolumeShards) FindEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, found bool) {
+ for _, s := range *shards {
+ if s.ShardId == shardId {
+ return s, true
+ }
+ }
+ return nil, false
+}
+
func (shards *EcVolumeShards) Close() {
for _, s := range *shards {
s.Close()
@@ -91,31 +71,19 @@ func (shards *EcVolumeShards) ToVolumeEcShardInformationMessage() (messages []*m
return
}
-func (v *EcVolumeShard) String() string {
- return fmt.Sprintf("ec shard %v:%v, dir:%s, Collection:%s", v.VolumeId, v.ShardId, v.dir, v.Collection)
-}
-
-func (v *EcVolumeShard) FileName() (fileName string) {
- return EcShardFileName(v.Collection, v.dir, int(v.VolumeId))
-}
+func (shards *EcVolumeShards) ReadEcShardNeedle(n *needle.Needle) (int, error) {
-func EcShardFileName(collection string, dir string, id int) (fileName string) {
- idString := strconv.Itoa(id)
- if collection == "" {
- fileName = path.Join(dir, idString)
- } else {
- fileName = path.Join(dir, collection+"_"+idString)
+ shard := (*shards)[0]
+ // find the needle from ecx file
+ offset, size, err := shard.findNeedleFromEcx(n.Id)
+ if err != nil {
+ return 0, err
}
- return
-}
-func (v *EcVolumeShard) Close() {
- if v.ecdFile != nil {
- _ = v.ecdFile.Close()
- v.ecdFile = nil
- }
- if v.ecxFile != nil {
- _ = v.ecxFile.Close()
- v.ecxFile = nil
- }
+ // calculate the locations in the ec shards
+ intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, shard.ecxFileSize, offset.ToAcutalOffset(), size)
+
+ // TODO read the intervals
+
+ return len(intervals), nil
}