aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/erasure_coding
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/erasure_coding')
-rw-r--r--weed/storage/erasure_coding/ec_decoder.go198
-rw-r--r--weed/storage/erasure_coding/ec_encoder.go30
-rw-r--r--weed/storage/erasure_coding/ec_test.go16
-rw-r--r--weed/storage/erasure_coding/ec_volume.go15
-rw-r--r--weed/storage/erasure_coding/ec_volume_delete.go10
-rw-r--r--weed/storage/erasure_coding/ec_volume_info.go16
6 files changed, 258 insertions, 27 deletions
diff --git a/weed/storage/erasure_coding/ec_decoder.go b/weed/storage/erasure_coding/ec_decoder.go
new file mode 100644
index 000000000..ae77cee3f
--- /dev/null
+++ b/weed/storage/erasure_coding/ec_decoder.go
@@ -0,0 +1,198 @@
+package erasure_coding
+
+import (
+ "fmt"
+ "io"
+ "os"
+
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/idx"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+// write .idx file from .ecx and .ecj files
+func WriteIdxFileFromEcIndex(baseFileName string) (err error) {
+
+ ecxFile, openErr := os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644)
+ if openErr != nil {
+ return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr)
+ }
+ defer ecxFile.Close()
+
+ idxFile, openErr := os.OpenFile(baseFileName+".idx", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
+ if openErr != nil {
+ return fmt.Errorf("cannot open %s.idx: %v", baseFileName, openErr)
+ }
+ defer idxFile.Close()
+
+ io.Copy(idxFile, ecxFile)
+
+ err = iterateEcjFile(baseFileName, func(key types.NeedleId) error {
+
+ bytes := needle_map.ToBytes(key, types.Offset{}, types.TombstoneFileSize)
+ idxFile.Write(bytes)
+
+ return nil
+ })
+
+ return err
+}
+
+// FindDatFileSize calculate .dat file size from max offset entry
+// there may be extra deletions after that entry
+// but they are deletions anyway
+func FindDatFileSize(baseFileName string) (datSize int64, err error) {
+
+ version, err := readEcVolumeVersion(baseFileName)
+ if err != nil {
+ return 0, fmt.Errorf("read ec volume %s version: %v", baseFileName, err)
+ }
+
+ err = iterateEcxFile(baseFileName, func(key types.NeedleId, offset types.Offset, size uint32) error {
+
+ if size == types.TombstoneFileSize {
+ return nil
+ }
+
+ entryStopOffset := offset.ToAcutalOffset() + needle.GetActualSize(size, version)
+ if datSize < entryStopOffset {
+ datSize = entryStopOffset
+ }
+
+ return nil
+ })
+
+ return
+}
+
+func readEcVolumeVersion(baseFileName string) (version needle.Version, err error) {
+
+ // find volume version
+ datFile, err := os.OpenFile(baseFileName+".ec00", os.O_RDONLY, 0644)
+ if err != nil {
+ return 0, fmt.Errorf("open ec volume %s superblock: %v", baseFileName, err)
+ }
+ datBackend := backend.NewDiskFile(datFile)
+
+ superBlock, err := super_block.ReadSuperBlock(datBackend)
+ datBackend.Close()
+ if err != nil {
+ return 0, fmt.Errorf("read ec volume %s superblock: %v", baseFileName, err)
+ }
+
+ return superBlock.Version, nil
+
+}
+
+func iterateEcxFile(baseFileName string, processNeedleFn func(key types.NeedleId, offset types.Offset, size uint32) error) error {
+ ecxFile, openErr := os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644)
+ if openErr != nil {
+ return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr)
+ }
+ defer ecxFile.Close()
+
+ buf := make([]byte, types.NeedleMapEntrySize)
+ for {
+ n, err := ecxFile.Read(buf)
+ if n != types.NeedleMapEntrySize {
+ if err == io.EOF {
+ return nil
+ }
+ return err
+ }
+ key, offset, size := idx.IdxFileEntry(buf)
+ if processNeedleFn != nil {
+ err = processNeedleFn(key, offset, size)
+ }
+ if err != nil {
+ if err != io.EOF {
+ return err
+ }
+ return nil
+ }
+ }
+
+}
+
+func iterateEcjFile(baseFileName string, processNeedleFn func(key types.NeedleId) error) error {
+ ecjFile, openErr := os.OpenFile(baseFileName+".ecj", os.O_RDONLY, 0644)
+ if openErr != nil {
+ return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr)
+ }
+ defer ecjFile.Close()
+
+ buf := make([]byte, types.NeedleIdSize)
+ for {
+ n, err := ecjFile.Read(buf)
+ if n != types.NeedleIdSize {
+ if err == io.EOF {
+ return nil
+ }
+ return err
+ }
+ if processNeedleFn != nil {
+ err = processNeedleFn(types.BytesToNeedleId(buf))
+ }
+ if err != nil {
+ if err == io.EOF {
+ return nil
+ }
+ return err
+ }
+ }
+
+}
+
+// WriteDatFile generates .dat from from .ec00 ~ .ec09 files
+func WriteDatFile(baseFileName string, datFileSize int64) error {
+
+ datFile, openErr := os.OpenFile(baseFileName+".dat", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
+ if openErr != nil {
+ return fmt.Errorf("cannot write volume %s.dat: %v", baseFileName, openErr)
+ }
+ defer datFile.Close()
+
+ inputFiles := make([]*os.File, DataShardsCount)
+
+ for shardId := 0; shardId < DataShardsCount; shardId++ {
+ shardFileName := baseFileName + ToExt(shardId)
+ inputFiles[shardId], openErr = os.OpenFile(shardFileName, os.O_RDONLY, 0)
+ if openErr != nil {
+ return openErr
+ }
+ defer inputFiles[shardId].Close()
+ }
+
+ for datFileSize >= DataShardsCount*ErasureCodingLargeBlockSize {
+ for shardId := 0; shardId < DataShardsCount; shardId++ {
+ w, err := io.CopyN(datFile, inputFiles[shardId], ErasureCodingLargeBlockSize)
+ if w != ErasureCodingLargeBlockSize {
+ return fmt.Errorf("copy %s large block %d: %v", baseFileName, shardId, err)
+ }
+ datFileSize -= ErasureCodingLargeBlockSize
+ }
+ }
+
+ for datFileSize > 0 {
+ for shardId := 0; shardId < DataShardsCount; shardId++ {
+ toRead := min(datFileSize, ErasureCodingSmallBlockSize)
+ w, err := io.CopyN(datFile, inputFiles[shardId], toRead)
+ if w != toRead {
+ return fmt.Errorf("copy %s small block %d: %v", baseFileName, shardId, err)
+ }
+ datFileSize -= toRead
+ }
+ }
+
+ return nil
+}
+
+func min(x, y int64) int64 {
+ if x > y {
+ return y
+ }
+ return x
+}
diff --git a/weed/storage/erasure_coding/ec_encoder.go b/weed/storage/erasure_coding/ec_encoder.go
index 97010a1ed..97c3ccbd9 100644
--- a/weed/storage/erasure_coding/ec_encoder.go
+++ b/weed/storage/erasure_coding/ec_encoder.go
@@ -5,12 +5,13 @@ import (
"io"
"os"
+ "github.com/klauspost/reedsolomon"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/klauspost/reedsolomon"
)
const (
@@ -21,35 +22,38 @@ const (
ErasureCodingSmallBlockSize = 1024 * 1024 // 1MB
)
-// WriteSortedEcxFile generates .ecx file from existing .idx file
+// WriteSortedFileFromIdx generates .ecx file from existing .idx file
// all keys are sorted in ascending order
-func WriteSortedEcxFile(baseFileName string) (e error) {
+func WriteSortedFileFromIdx(baseFileName string, ext string) (e error) {
- cm, err := readCompactMap(baseFileName)
+ nm, err := readNeedleMap(baseFileName)
+ if nm != nil {
+ defer nm.Close()
+ }
if err != nil {
- return fmt.Errorf("readCompactMap: %v", err)
+ return fmt.Errorf("readNeedleMap: %v", err)
}
- ecxFile, err := os.OpenFile(baseFileName+".ecx", os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
+ ecxFile, err := os.OpenFile(baseFileName+ext, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("failed to open ecx file: %v", err)
}
defer ecxFile.Close()
- err = cm.AscendingVisit(func(value needle_map.NeedleValue) error {
+ err = nm.AscendingVisit(func(value needle_map.NeedleValue) error {
bytes := value.ToBytes()
_, writeErr := ecxFile.Write(bytes)
return writeErr
})
if err != nil {
- return fmt.Errorf("failed to visit ecx file: %v", err)
+ return fmt.Errorf("failed to visit idx file: %v", err)
}
return nil
}
-// WriteEcFiles generates .ec01 ~ .ec14 files
+// WriteEcFiles generates .ec00 ~ .ec13 files
func WriteEcFiles(baseFileName string) error {
return generateEcFiles(baseFileName, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize)
}
@@ -195,7 +199,7 @@ func encodeDatFile(remainingSize int64, err error, baseFileName string, bufferSi
}
buffers := make([][]byte, TotalShardsCount)
- for i, _ := range buffers {
+ for i := range buffers {
buffers[i] = make([]byte, bufferSize)
}
@@ -232,7 +236,7 @@ func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*o
}
buffers := make([][]byte, TotalShardsCount)
- for i, _ := range buffers {
+ for i := range buffers {
if shardHasData[i] {
buffers[i] = make([]byte, ErasureCodingSmallBlockSize)
}
@@ -280,14 +284,14 @@ func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*o
}
-func readCompactMap(baseFileName string) (*needle_map.CompactMap, error) {
+func readNeedleMap(baseFileName string) (*needle_map.MemDb, error) {
indexFile, err := os.OpenFile(baseFileName+".idx", os.O_RDONLY, 0644)
if err != nil {
return nil, fmt.Errorf("cannot read Volume Index %s.idx: %v", baseFileName, err)
}
defer indexFile.Close()
- cm := needle_map.NewCompactMap()
+ cm := needle_map.NewMemDb()
err = idx.WalkIndexFile(indexFile, func(key types.NeedleId, offset types.Offset, size uint32) error {
if !offset.IsZero() && size != types.TombstoneFileSize {
cm.Set(key, offset, size)
diff --git a/weed/storage/erasure_coding/ec_test.go b/weed/storage/erasure_coding/ec_test.go
index 57df09525..92b83cdc8 100644
--- a/weed/storage/erasure_coding/ec_test.go
+++ b/weed/storage/erasure_coding/ec_test.go
@@ -7,9 +7,10 @@ import (
"os"
"testing"
+ "github.com/klauspost/reedsolomon"
+
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
"github.com/chrislusf/seaweedfs/weed/storage/types"
- "github.com/klauspost/reedsolomon"
)
const (
@@ -26,14 +27,14 @@ func TestEncodingDecoding(t *testing.T) {
t.Logf("generateEcFiles: %v", err)
}
- err = WriteSortedEcxFile(baseFileName)
+ err = WriteSortedFileFromIdx(baseFileName, ".ecx")
if err != nil {
- t.Logf("WriteSortedEcxFile: %v", err)
+ t.Logf("WriteSortedFileFromIdx: %v", err)
}
err = validateFiles(baseFileName)
if err != nil {
- t.Logf("WriteSortedEcxFile: %v", err)
+ t.Logf("WriteSortedFileFromIdx: %v", err)
}
removeGeneratedFiles(baseFileName)
@@ -41,9 +42,10 @@ func TestEncodingDecoding(t *testing.T) {
}
func validateFiles(baseFileName string) error {
- cm, err := readCompactMap(baseFileName)
+ nm, err := readNeedleMap(baseFileName)
+ defer nm.Close()
if err != nil {
- return fmt.Errorf("readCompactMap: %v", err)
+ return fmt.Errorf("readNeedleMap: %v", err)
}
datFile, err := os.OpenFile(baseFileName+".dat", os.O_RDONLY, 0)
@@ -60,7 +62,7 @@ func validateFiles(baseFileName string) error {
ecFiles, err := openEcFiles(baseFileName, true)
defer closeEcFiles(ecFiles)
- err = cm.AscendingVisit(func(value needle_map.NeedleValue) error {
+ err = nm.AscendingVisit(func(value needle_map.NeedleValue) error {
return assertSame(datFile, fi.Size(), ecFiles, value.Offset, value.Size)
})
if err != nil {
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go
index bcae164ca..3d9aa2cff 100644
--- a/weed/storage/erasure_coding/ec_volume.go
+++ b/weed/storage/erasure_coding/ec_volume.go
@@ -9,7 +9,9 @@ import (
"sync"
"time"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types"
@@ -56,6 +58,14 @@ func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolu
return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", baseFileName, err)
}
+ // read volume info
+ ev.Version = needle.Version3
+ if volumeInfo, found, _ := pb.MaybeLoadVolumeInfo(baseFileName + ".vif"); found {
+ ev.Version = needle.Version(volumeInfo.Version)
+ } else {
+ pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
+ }
+
ev.ShardLocations = make(map[ShardId][]string)
return
@@ -126,6 +136,7 @@ func (ev *EcVolume) Destroy() {
}
os.Remove(ev.FileName() + ".ecx")
os.Remove(ev.FileName() + ".ecj")
+ os.Remove(ev.FileName() + ".vif")
}
func (ev *EcVolume) FileName() string {
@@ -186,10 +197,10 @@ func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.
}
func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) {
- return searchNeedleFromEcx(ev.ecxFile, ev.ecxFileSize, needleId, nil)
+ return SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, nil)
}
-func searchNeedleFromEcx(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size uint32, err error) {
+func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size uint32, err error) {
var key types.NeedleId
buf := make([]byte, types.NeedleMapEntrySize)
l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize
diff --git a/weed/storage/erasure_coding/ec_volume_delete.go b/weed/storage/erasure_coding/ec_volume_delete.go
index 04102ec9e..822a9e923 100644
--- a/weed/storage/erasure_coding/ec_volume_delete.go
+++ b/weed/storage/erasure_coding/ec_volume_delete.go
@@ -10,15 +10,15 @@ import (
)
var (
- markNeedleDeleted = func(file *os.File, offset int64) error {
+ MarkNeedleDeleted = func(file *os.File, offset int64) error {
b := make([]byte, types.SizeSize)
util.Uint32toBytes(b, types.TombstoneFileSize)
n, err := file.WriteAt(b, offset+types.NeedleIdSize+types.OffsetSize)
if err != nil {
- return fmt.Errorf("ecx write error: %v", err)
+ return fmt.Errorf("sorted needle write error: %v", err)
}
if n != types.SizeSize {
- return fmt.Errorf("ecx written %d bytes, expecting %d", n, types.SizeSize)
+ return fmt.Errorf("sorted needle written %d bytes, expecting %d", n, types.SizeSize)
}
return nil
}
@@ -26,7 +26,7 @@ var (
func (ev *EcVolume) DeleteNeedleFromEcx(needleId types.NeedleId) (err error) {
- _, _, err = searchNeedleFromEcx(ev.ecxFile, ev.ecxFileSize, needleId, markNeedleDeleted)
+ _, _, err = SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, MarkNeedleDeleted)
if err != nil {
if err == NotFoundError {
@@ -81,7 +81,7 @@ func RebuildEcxFile(baseFileName string) error {
needleId := types.BytesToNeedleId(buf)
- _, _, err = searchNeedleFromEcx(ecxFile, ecxFileSize, needleId, markNeedleDeleted)
+ _, _, err = SearchNeedleFromSortedIndex(ecxFile, ecxFileSize, needleId, MarkNeedleDeleted)
if err != nil && err != NotFoundError {
ecxFile.Close()
diff --git a/weed/storage/erasure_coding/ec_volume_info.go b/weed/storage/erasure_coding/ec_volume_info.go
index c9e85c662..8ff65bb0f 100644
--- a/weed/storage/erasure_coding/ec_volume_info.go
+++ b/weed/storage/erasure_coding/ec_volume_info.go
@@ -81,6 +81,15 @@ func (b ShardBits) ShardIds() (ret []ShardId) {
return
}
+func (b ShardBits) ToUint32Slice() (ret []uint32) {
+ for i := uint32(0); i < TotalShardsCount; i++ {
+ if b.HasShardId(ShardId(i)) {
+ ret = append(ret, i)
+ }
+ }
+ return
+}
+
func (b ShardBits) ShardIdCount() (count int) {
for count = 0; b > 0; count++ {
b &= b - 1
@@ -95,3 +104,10 @@ func (b ShardBits) Minus(other ShardBits) ShardBits {
func (b ShardBits) Plus(other ShardBits) ShardBits {
return b | other
}
+
+func (b ShardBits) MinusParityShards() ShardBits {
+ for i := DataShardsCount; i < TotalShardsCount; i++ {
+ b = b.RemoveShardId(ShardId(i))
+ }
+ return b
+}