aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/erasure_coding
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/erasure_coding')
-rw-r--r--weed/storage/erasure_coding/389.ecxbin0 -> 7761568 bytes
-rw-r--r--weed/storage/erasure_coding/ec_decoder.go20
-rw-r--r--weed/storage/erasure_coding/ec_encoder.go24
-rw-r--r--weed/storage/erasure_coding/ec_locate.go10
-rw-r--r--weed/storage/erasure_coding/ec_shard.go14
-rw-r--r--weed/storage/erasure_coding/ec_test.go22
-rw-r--r--weed/storage/erasure_coding/ec_volume.go62
-rw-r--r--weed/storage/erasure_coding/ec_volume_delete.go2
-rw-r--r--weed/storage/erasure_coding/ec_volume_info.go6
-rw-r--r--weed/storage/erasure_coding/ec_volume_test.go54
10 files changed, 158 insertions, 56 deletions
diff --git a/weed/storage/erasure_coding/389.ecx b/weed/storage/erasure_coding/389.ecx
new file mode 100644
index 000000000..158781920
--- /dev/null
+++ b/weed/storage/erasure_coding/389.ecx
Binary files differ
diff --git a/weed/storage/erasure_coding/ec_decoder.go b/weed/storage/erasure_coding/ec_decoder.go
index ae77cee3f..47d3c6550 100644
--- a/weed/storage/erasure_coding/ec_decoder.go
+++ b/weed/storage/erasure_coding/ec_decoder.go
@@ -11,6 +11,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
// write .idx file from .ecx and .ecj files
@@ -44,20 +45,20 @@ func WriteIdxFileFromEcIndex(baseFileName string) (err error) {
// FindDatFileSize calculate .dat file size from max offset entry
// there may be extra deletions after that entry
// but they are deletions anyway
-func FindDatFileSize(baseFileName string) (datSize int64, err error) {
+func FindDatFileSize(dataBaseFileName, indexBaseFileName string) (datSize int64, err error) {
- version, err := readEcVolumeVersion(baseFileName)
+ version, err := readEcVolumeVersion(dataBaseFileName)
if err != nil {
- return 0, fmt.Errorf("read ec volume %s version: %v", baseFileName, err)
+ return 0, fmt.Errorf("read ec volume %s version: %v", dataBaseFileName, err)
}
- err = iterateEcxFile(baseFileName, func(key types.NeedleId, offset types.Offset, size uint32) error {
+ err = iterateEcxFile(indexBaseFileName, func(key types.NeedleId, offset types.Offset, size types.Size) error {
- if size == types.TombstoneFileSize {
+ if size.IsDeleted() {
return nil
}
- entryStopOffset := offset.ToAcutalOffset() + needle.GetActualSize(size, version)
+ entryStopOffset := offset.ToActualOffset() + needle.GetActualSize(size, version)
if datSize < entryStopOffset {
datSize = entryStopOffset
}
@@ -87,7 +88,7 @@ func readEcVolumeVersion(baseFileName string) (version needle.Version, err error
}
-func iterateEcxFile(baseFileName string, processNeedleFn func(key types.NeedleId, offset types.Offset, size uint32) error) error {
+func iterateEcxFile(baseFileName string, processNeedleFn func(key types.NeedleId, offset types.Offset, size types.Size) error) error {
ecxFile, openErr := os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644)
if openErr != nil {
return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr)
@@ -118,9 +119,12 @@ func iterateEcxFile(baseFileName string, processNeedleFn func(key types.NeedleId
}
func iterateEcjFile(baseFileName string, processNeedleFn func(key types.NeedleId) error) error {
+ if !util.FileExists(baseFileName + ".ecj") {
+ return nil
+ }
ecjFile, openErr := os.OpenFile(baseFileName+".ecj", os.O_RDONLY, 0644)
if openErr != nil {
- return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr)
+ return fmt.Errorf("cannot open ec index %s.ecj: %v", baseFileName, openErr)
}
defer ecjFile.Close()
diff --git a/weed/storage/erasure_coding/ec_encoder.go b/weed/storage/erasure_coding/ec_encoder.go
index eeb384b91..34b639407 100644
--- a/weed/storage/erasure_coding/ec_encoder.go
+++ b/weed/storage/erasure_coding/ec_encoder.go
@@ -5,12 +5,13 @@ import (
"io"
"os"
+ "github.com/klauspost/reedsolomon"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/klauspost/reedsolomon"
)
const (
@@ -25,9 +26,12 @@ const (
// all keys are sorted in ascending order
func WriteSortedFileFromIdx(baseFileName string, ext string) (e error) {
- cm, err := readCompactMap(baseFileName)
+ nm, err := readNeedleMap(baseFileName)
+ if nm != nil {
+ defer nm.Close()
+ }
if err != nil {
- return fmt.Errorf("readCompactMap: %v", err)
+ return fmt.Errorf("readNeedleMap: %v", err)
}
ecxFile, err := os.OpenFile(baseFileName+ext, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
@@ -36,7 +40,7 @@ func WriteSortedFileFromIdx(baseFileName string, ext string) (e error) {
}
defer ecxFile.Close()
- err = cm.AscendingVisit(func(value needle_map.NeedleValue) error {
+ err = nm.AscendingVisit(func(value needle_map.NeedleValue) error {
bytes := value.ToBytes()
_, writeErr := ecxFile.Write(bytes)
return writeErr
@@ -73,6 +77,8 @@ func generateEcFiles(baseFileName string, bufferSize int, largeBlockSize int64,
if err != nil {
return fmt.Errorf("failed to stat dat file: %v", err)
}
+
+ glog.V(0).Infof("encodeDatFile %s.dat size:%d", baseFileName, fi.Size())
err = encodeDatFile(fi.Size(), err, baseFileName, bufferSize, largeBlockSize, file, smallBlockSize)
if err != nil {
return fmt.Errorf("encodeDatFile: %v", err)
@@ -195,7 +201,7 @@ func encodeDatFile(remainingSize int64, err error, baseFileName string, bufferSi
}
buffers := make([][]byte, TotalShardsCount)
- for i, _ := range buffers {
+ for i := range buffers {
buffers[i] = make([]byte, bufferSize)
}
@@ -232,7 +238,7 @@ func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*o
}
buffers := make([][]byte, TotalShardsCount)
- for i, _ := range buffers {
+ for i := range buffers {
if shardHasData[i] {
buffers[i] = make([]byte, ErasureCodingSmallBlockSize)
}
@@ -280,15 +286,15 @@ func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*o
}
-func readCompactMap(baseFileName string) (*needle_map.CompactMap, error) {
+func readNeedleMap(baseFileName string) (*needle_map.MemDb, error) {
indexFile, err := os.OpenFile(baseFileName+".idx", os.O_RDONLY, 0644)
if err != nil {
return nil, fmt.Errorf("cannot read Volume Index %s.idx: %v", baseFileName, err)
}
defer indexFile.Close()
- cm := needle_map.NewCompactMap()
- err = idx.WalkIndexFile(indexFile, func(key types.NeedleId, offset types.Offset, size uint32) error {
+ cm := needle_map.NewMemDb()
+ err = idx.WalkIndexFile(indexFile, func(key types.NeedleId, offset types.Offset, size types.Size) error {
if !offset.IsZero() && size != types.TombstoneFileSize {
cm.Set(key, offset, size)
} else {
diff --git a/weed/storage/erasure_coding/ec_locate.go b/weed/storage/erasure_coding/ec_locate.go
index 562966f8f..19eba6235 100644
--- a/weed/storage/erasure_coding/ec_locate.go
+++ b/weed/storage/erasure_coding/ec_locate.go
@@ -1,14 +1,18 @@
package erasure_coding
+import (
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
type Interval struct {
BlockIndex int
InnerBlockOffset int64
- Size uint32
+ Size types.Size
IsLargeBlock bool
LargeBlockRowsCount int
}
-func LocateData(largeBlockLength, smallBlockLength int64, datSize int64, offset int64, size uint32) (intervals []Interval) {
+func LocateData(largeBlockLength, smallBlockLength int64, datSize int64, offset int64, size types.Size) (intervals []Interval) {
blockIndex, isLargeBlock, innerBlockOffset := locateOffset(largeBlockLength, smallBlockLength, datSize, offset)
// adding DataShardsCount*smallBlockLength to ensure we can derive the number of large block size from a shard size
@@ -32,7 +36,7 @@ func LocateData(largeBlockLength, smallBlockLength int64, datSize int64, offset
intervals = append(intervals, interval)
return
}
- interval.Size = uint32(blockRemaining)
+ interval.Size = types.Size(blockRemaining)
intervals = append(intervals, interval)
size -= interval.Size
diff --git a/weed/storage/erasure_coding/ec_shard.go b/weed/storage/erasure_coding/ec_shard.go
index 47e6d3d1e..2a57d85ef 100644
--- a/weed/storage/erasure_coding/ec_shard.go
+++ b/weed/storage/erasure_coding/ec_shard.go
@@ -2,9 +2,11 @@ package erasure_coding
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
"os"
"path"
"strconv"
+ "strings"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@@ -19,21 +21,25 @@ type EcVolumeShard struct {
dir string
ecdFile *os.File
ecdFileSize int64
+ DiskType types.DiskType
}
-func NewEcVolumeShard(dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) {
+func NewEcVolumeShard(diskType types.DiskType, dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) {
- v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId}
+ v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId, DiskType: diskType}
baseFileName := v.FileName()
// open ecd file
if v.ecdFile, e = os.OpenFile(baseFileName+ToExt(int(shardId)), os.O_RDONLY, 0644); e != nil {
- return nil, fmt.Errorf("cannot read ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), e)
+ if e == os.ErrNotExist || strings.Contains(e.Error(), "no such file or directory") {
+ return nil, os.ErrNotExist
+ }
+ return nil, fmt.Errorf("cannot read ec volume shard %s%s: %v", baseFileName, ToExt(int(shardId)), e)
}
ecdFi, statErr := v.ecdFile.Stat()
if statErr != nil {
- return nil, fmt.Errorf("can not stat ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), statErr)
+ return nil, fmt.Errorf("can not stat ec volume shard %s%s: %v", baseFileName, ToExt(int(shardId)), statErr)
}
v.ecdFileSize = ecdFi.Size()
diff --git a/weed/storage/erasure_coding/ec_test.go b/weed/storage/erasure_coding/ec_test.go
index 0e4aaa27c..0d48bec02 100644
--- a/weed/storage/erasure_coding/ec_test.go
+++ b/weed/storage/erasure_coding/ec_test.go
@@ -7,9 +7,10 @@ import (
"os"
"testing"
+ "github.com/klauspost/reedsolomon"
+
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
"github.com/chrislusf/seaweedfs/weed/storage/types"
- "github.com/klauspost/reedsolomon"
)
const (
@@ -41,9 +42,10 @@ func TestEncodingDecoding(t *testing.T) {
}
func validateFiles(baseFileName string) error {
- cm, err := readCompactMap(baseFileName)
+ nm, err := readNeedleMap(baseFileName)
+ defer nm.Close()
if err != nil {
- return fmt.Errorf("readCompactMap: %v", err)
+ return fmt.Errorf("readNeedleMap: %v", err)
}
datFile, err := os.OpenFile(baseFileName+".dat", os.O_RDONLY, 0)
@@ -60,7 +62,7 @@ func validateFiles(baseFileName string) error {
ecFiles, err := openEcFiles(baseFileName, true)
defer closeEcFiles(ecFiles)
- err = cm.AscendingVisit(func(value needle_map.NeedleValue) error {
+ err = nm.AscendingVisit(func(value needle_map.NeedleValue) error {
return assertSame(datFile, fi.Size(), ecFiles, value.Offset, value.Size)
})
if err != nil {
@@ -69,7 +71,7 @@ func validateFiles(baseFileName string) error {
return nil
}
-func assertSame(datFile *os.File, datSize int64, ecFiles []*os.File, offset types.Offset, size uint32) error {
+func assertSame(datFile *os.File, datSize int64, ecFiles []*os.File, offset types.Offset, size types.Size) error {
data, err := readDatFile(datFile, offset, size)
if err != nil {
@@ -88,10 +90,10 @@ func assertSame(datFile *os.File, datSize int64, ecFiles []*os.File, offset type
return nil
}
-func readDatFile(datFile *os.File, offset types.Offset, size uint32) ([]byte, error) {
+func readDatFile(datFile *os.File, offset types.Offset, size types.Size) ([]byte, error) {
data := make([]byte, size)
- n, err := datFile.ReadAt(data, offset.ToAcutalOffset())
+ n, err := datFile.ReadAt(data, offset.ToActualOffset())
if err != nil {
return nil, fmt.Errorf("failed to ReadAt dat file: %v", err)
}
@@ -101,9 +103,9 @@ func readDatFile(datFile *os.File, offset types.Offset, size uint32) ([]byte, er
return data, nil
}
-func readEcFile(datSize int64, ecFiles []*os.File, offset types.Offset, size uint32) (data []byte, err error) {
+func readEcFile(datSize int64, ecFiles []*os.File, offset types.Offset, size types.Size) (data []byte, err error) {
- intervals := LocateData(largeBlockSize, smallBlockSize, datSize, offset.ToAcutalOffset(), size)
+ intervals := LocateData(largeBlockSize, smallBlockSize, datSize, offset.ToActualOffset(), size)
for i, interval := range intervals {
if d, e := readOneInterval(interval, ecFiles); e != nil {
@@ -138,7 +140,7 @@ func readOneInterval(interval Interval, ecFiles []*os.File) (data []byte, err er
return
}
-func readFromOtherEcFiles(ecFiles []*os.File, ecFileIndex int, ecFileOffset int64, size uint32) (data []byte, err error) {
+func readFromOtherEcFiles(ecFiles []*os.File, ecFileIndex int, ecFileOffset int64, size types.Size) (data []byte, err error) {
enc, err := reedsolomon.New(DataShardsCount, ParityShardsCount)
if err != nil {
return nil, fmt.Errorf("failed to create encoder: %v", err)
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go
index 579f037fb..171db92a4 100644
--- a/weed/storage/erasure_coding/ec_volume.go
+++ b/weed/storage/erasure_coding/ec_volume.go
@@ -25,6 +25,7 @@ type EcVolume struct {
VolumeId needle.VolumeId
Collection string
dir string
+ dirIdx string
ecxFile *os.File
ecxFileSize int64
ecxCreatedAt time.Time
@@ -35,35 +36,37 @@ type EcVolume struct {
Version needle.Version
ecjFile *os.File
ecjFileAccessLock sync.Mutex
+ diskType types.DiskType
}
-func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
- ev = &EcVolume{dir: dir, Collection: collection, VolumeId: vid}
+func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
+ ev = &EcVolume{dir: dir, dirIdx: dirIdx, Collection: collection, VolumeId: vid, diskType: diskType}
- baseFileName := EcShardFileName(collection, dir, int(vid))
+ dataBaseFileName := EcShardFileName(collection, dir, int(vid))
+ indexBaseFileName := EcShardFileName(collection, dirIdx, int(vid))
// open ecx file
- if ev.ecxFile, err = os.OpenFile(baseFileName+".ecx", os.O_RDWR, 0644); err != nil {
- return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", baseFileName, err)
+ if ev.ecxFile, err = os.OpenFile(indexBaseFileName+".ecx", os.O_RDWR, 0644); err != nil {
+ return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", indexBaseFileName, err)
}
ecxFi, statErr := ev.ecxFile.Stat()
if statErr != nil {
- return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", baseFileName, statErr)
+ return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", indexBaseFileName, statErr)
}
ev.ecxFileSize = ecxFi.Size()
ev.ecxCreatedAt = ecxFi.ModTime()
// open ecj file
- if ev.ecjFile, err = os.OpenFile(baseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil {
- return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", baseFileName, err)
+ if ev.ecjFile, err = os.OpenFile(indexBaseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil {
+ return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", indexBaseFileName, err)
}
// read volume info
ev.Version = needle.Version3
- if volumeInfo, found := pb.MaybeLoadVolumeInfo(baseFileName + ".vif"); found {
+ if volumeInfo, _, found, _ := pb.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found {
ev.Version = needle.Version(volumeInfo.Version)
} else {
- pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
+ pb.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
}
ev.ShardLocations = make(map[ShardId][]string)
@@ -134,24 +137,42 @@ func (ev *EcVolume) Destroy() {
for _, s := range ev.Shards {
s.Destroy()
}
- os.Remove(ev.FileName() + ".ecx")
- os.Remove(ev.FileName() + ".ecj")
- os.Remove(ev.FileName() + ".vif")
+ os.Remove(ev.FileName(".ecx"))
+ os.Remove(ev.FileName(".ecj"))
+ os.Remove(ev.FileName(".vif"))
}
-func (ev *EcVolume) FileName() string {
+func (ev *EcVolume) FileName(ext string) string {
+ switch ext {
+ case ".ecx", ".ecj":
+ return ev.IndexBaseFileName() + ext
+ }
+ // .vif
+ return ev.DataBaseFileName() + ext
+}
+func (ev *EcVolume) DataBaseFileName() string {
return EcShardFileName(ev.Collection, ev.dir, int(ev.VolumeId))
+}
+func (ev *EcVolume) IndexBaseFileName() string {
+ return EcShardFileName(ev.Collection, ev.dirIdx, int(ev.VolumeId))
}
-func (ev *EcVolume) ShardSize() int64 {
+func (ev *EcVolume) ShardSize() uint64 {
if len(ev.Shards) > 0 {
- return ev.Shards[0].Size()
+ return uint64(ev.Shards[0].Size())
}
return 0
}
+func (ev *EcVolume) Size() (size int64) {
+ for _, shard := range ev.Shards {
+ size += shard.Size()
+ }
+ return
+}
+
func (ev *EcVolume) CreatedAt() time.Time {
return ev.ecxCreatedAt
}
@@ -171,6 +192,7 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.V
m = &master_pb.VolumeEcShardInformationMessage{
Id: uint32(s.VolumeId),
Collection: s.Collection,
+ DiskType: string(ev.diskType),
}
messages = append(messages, m)
}
@@ -180,7 +202,7 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.V
return
}
-func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.Version) (offset types.Offset, size uint32, intervals []Interval, err error) {
+func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.Version) (offset types.Offset, size types.Size, intervals []Interval, err error) {
// find the needle from ecx file
offset, size, err = ev.FindNeedleFromEcx(needleId)
@@ -191,16 +213,16 @@ func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.
shard := ev.Shards[0]
// calculate the locations in the ec shards
- intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToAcutalOffset(), uint32(needle.GetActualSize(size, version)))
+ intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, version)))
return
}
-func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) {
+func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size types.Size, err error) {
return SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, nil)
}
-func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size uint32, err error) {
+func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size types.Size, err error) {
var key types.NeedleId
buf := make([]byte, types.NeedleMapEntrySize)
l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize
diff --git a/weed/storage/erasure_coding/ec_volume_delete.go b/weed/storage/erasure_coding/ec_volume_delete.go
index 822a9e923..a7f8c24a3 100644
--- a/weed/storage/erasure_coding/ec_volume_delete.go
+++ b/weed/storage/erasure_coding/ec_volume_delete.go
@@ -12,7 +12,7 @@ import (
var (
MarkNeedleDeleted = func(file *os.File, offset int64) error {
b := make([]byte, types.SizeSize)
- util.Uint32toBytes(b, types.TombstoneFileSize)
+ types.SizeToBytes(b, types.TombstoneFileSize)
n, err := file.WriteAt(b, offset+types.NeedleIdSize+types.OffsetSize)
if err != nil {
return fmt.Errorf("sorted needle write error: %v", err)
diff --git a/weed/storage/erasure_coding/ec_volume_info.go b/weed/storage/erasure_coding/ec_volume_info.go
index 8ff65bb0f..3dd535e64 100644
--- a/weed/storage/erasure_coding/ec_volume_info.go
+++ b/weed/storage/erasure_coding/ec_volume_info.go
@@ -10,13 +10,15 @@ type EcVolumeInfo struct {
VolumeId needle.VolumeId
Collection string
ShardBits ShardBits
+ DiskType string
}
-func NewEcVolumeInfo(collection string, vid needle.VolumeId, shardBits ShardBits) *EcVolumeInfo {
+func NewEcVolumeInfo(diskType string, collection string, vid needle.VolumeId, shardBits ShardBits) *EcVolumeInfo {
return &EcVolumeInfo{
Collection: collection,
VolumeId: vid,
ShardBits: shardBits,
+ DiskType: diskType,
}
}
@@ -45,6 +47,7 @@ func (ecInfo *EcVolumeInfo) Minus(other *EcVolumeInfo) *EcVolumeInfo {
VolumeId: ecInfo.VolumeId,
Collection: ecInfo.Collection,
ShardBits: ecInfo.ShardBits.Minus(other.ShardBits),
+ DiskType: ecInfo.DiskType,
}
return ret
@@ -55,6 +58,7 @@ func (ecInfo *EcVolumeInfo) ToVolumeEcShardInformationMessage() (ret *master_pb.
Id: uint32(ecInfo.VolumeId),
EcIndexBits: uint32(ecInfo.ShardBits),
Collection: ecInfo.Collection,
+ DiskType: ecInfo.DiskType,
}
}
diff --git a/weed/storage/erasure_coding/ec_volume_test.go b/weed/storage/erasure_coding/ec_volume_test.go
new file mode 100644
index 000000000..747ef4aab
--- /dev/null
+++ b/weed/storage/erasure_coding/ec_volume_test.go
@@ -0,0 +1,54 @@
+package erasure_coding
+
+import (
+ "fmt"
+ "os"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+func TestPositioning(t *testing.T) {
+
+ ecxFile, err := os.OpenFile("389.ecx", os.O_RDONLY, 0)
+ if err != nil {
+ t.Errorf("failed to open ecx file: %v", err)
+ }
+ defer ecxFile.Close()
+
+ stat, _ := ecxFile.Stat()
+ fileSize := stat.Size()
+
+ tests := []struct {
+ needleId string
+ offset int64
+ size int
+ }{
+ {needleId: "0f0edb92", offset: 31300679656, size: 1167},
+ {needleId: "0ef7d7f8", offset: 11513014944, size: 66044},
+ }
+
+ for _, test := range tests {
+ needleId, _ := types.ParseNeedleId(test.needleId)
+ offset, size, err := SearchNeedleFromSortedIndex(ecxFile, fileSize, needleId, nil)
+ assert.Equal(t, nil, err, "SearchNeedleFromSortedIndex")
+ fmt.Printf("offset: %d size: %d\n", offset.ToActualOffset(), size)
+ }
+
+ needleId, _ := types.ParseNeedleId("0f087622")
+ offset, size, err := SearchNeedleFromSortedIndex(ecxFile, fileSize, needleId, nil)
+ assert.Equal(t, nil, err, "SearchNeedleFromSortedIndex")
+ fmt.Printf("offset: %d size: %d\n", offset.ToActualOffset(), size)
+
+ var shardEcdFileSize int64 = 1118830592 // 1024*1024*1024*3
+ intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shardEcdFileSize, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, needle.CurrentVersion)))
+
+ for _, interval := range intervals {
+ shardId, shardOffset := interval.ToShardIdAndOffset(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize)
+ fmt.Printf("interval: %+v, shardId: %d, shardOffset: %d\n", interval, shardId, shardOffset)
+ }
+
+}