aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/erasure_coding/ec_encoder.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/erasure_coding/ec_encoder.go')
-rw-r--r--weed/storage/erasure_coding/ec_encoder.go105
1 files changed, 102 insertions, 3 deletions
diff --git a/weed/storage/erasure_coding/ec_encoder.go b/weed/storage/erasure_coding/ec_encoder.go
index da0cfcde8..090e6e075 100644
--- a/weed/storage/erasure_coding/ec_encoder.go
+++ b/weed/storage/erasure_coding/ec_encoder.go
@@ -9,6 +9,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
"github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/klauspost/reedsolomon"
)
@@ -53,6 +54,10 @@ func WriteEcFiles(baseFileName string) error {
return generateEcFiles(baseFileName, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize)
}
+func RebuildEcFiles(baseFileName string) ([]uint32, error) {
+ return generateMissingEcFiles(baseFileName, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize)
+}
+
func ToExt(ecIndex int) string {
return fmt.Sprintf(".ec%02d", ecIndex)
}
@@ -75,6 +80,37 @@ func generateEcFiles(baseFileName string, bufferSize int, largeBlockSize int64,
return nil
}
+func generateMissingEcFiles(baseFileName string, bufferSize int, largeBlockSize int64, smallBlockSize int64) (generatedShardIds []uint32, err error) {
+
+ shardHasData := make([]bool, TotalShardsCount)
+ inputFiles := make([]*os.File, TotalShardsCount)
+ outputFiles := make([]*os.File, TotalShardsCount)
+ for shardId := 0; shardId < TotalShardsCount; shardId++ {
+ shardFileName := baseFileName + ToExt(shardId)
+ if util.FileExists(shardFileName) {
+ shardHasData[shardId] = true
+ inputFiles[shardId], err = os.OpenFile(shardFileName, os.O_RDONLY, 0)
+ if err != nil {
+ return nil, err
+ }
+ defer inputFiles[shardId].Close()
+ } else {
+ outputFiles[shardId], err = os.OpenFile(shardFileName, os.O_TRUNC|os.O_WRONLY|os.O_CREATE, 0644)
+ if err != nil {
+ return nil, err
+ }
+ defer outputFiles[shardId].Close()
+ generatedShardIds = append(generatedShardIds, uint32(shardId))
+ }
+ }
+
+ err = rebuildEcFiles(shardHasData, inputFiles, outputFiles)
+ if err != nil {
+ return nil, fmt.Errorf("rebuildEcFiles: %v", err)
+ }
+ return
+}
+
func encodeData(file *os.File, enc reedsolomon.Encoder, startOffset, blockSize int64, buffers [][]byte, outputs []*os.File) error {
bufferSize := int64(len(buffers[0]))
@@ -150,20 +186,25 @@ func encodeDataOneBatch(file *os.File, enc reedsolomon.Encoder, startOffset, blo
}
func encodeDatFile(remainingSize int64, err error, baseFileName string, bufferSize int, largeBlockSize int64, file *os.File, smallBlockSize int64) error {
+
var processedSize int64
+
enc, err := reedsolomon.New(DataShardsCount, ParityShardsCount)
if err != nil {
return fmt.Errorf("failed to create encoder: %v", err)
}
+
buffers := make([][]byte, TotalShardsCount)
+ for i, _ := range buffers {
+ buffers[i] = make([]byte, bufferSize)
+ }
+
outputs, err := openEcFiles(baseFileName, false)
defer closeEcFiles(outputs)
if err != nil {
return fmt.Errorf("failed to open dat file: %v", err)
}
- for i, _ := range buffers {
- buffers[i] = make([]byte, bufferSize)
- }
+
for remainingSize > largeBlockSize*DataShardsCount {
err = encodeData(file, enc, processedSize, largeBlockSize, buffers, outputs)
if err != nil {
@@ -183,6 +224,64 @@ func encodeDatFile(remainingSize int64, err error, baseFileName string, bufferSi
return nil
}
+func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*os.File) error {
+
+ enc, err := reedsolomon.New(DataShardsCount, ParityShardsCount)
+ if err != nil {
+ return fmt.Errorf("failed to create encoder: %v", err)
+ }
+
+ buffers := make([][]byte, TotalShardsCount)
+ for i, _ := range buffers {
+ if shardHasData[i] {
+ buffers[i] = make([]byte, ErasureCodingSmallBlockSize)
+ }
+ }
+
+ var startOffset int64
+ var inputBufferDataSize int
+ for {
+
+ // read the input data from files
+ for i := 0; i < TotalShardsCount; i++ {
+ if shardHasData[i] {
+ n, _ := inputFiles[i].ReadAt(buffers[i], startOffset)
+ if n == 0 {
+ return nil
+ }
+ if inputBufferDataSize != 0 {
+ inputBufferDataSize = n
+ }
+ if inputBufferDataSize != n {
+ return fmt.Errorf("ec shard size need to be the same")
+ }
+ } else {
+ buffers[i] = nil
+ }
+ }
+
+ fmt.Printf("reconstructing [%d,%d)\n", startOffset, startOffset+int64(inputBufferDataSize))
+
+ // encode the data
+ err = enc.Reconstruct(buffers)
+ if err != nil {
+ return fmt.Errorf("reconstruct: %v", err)
+ }
+
+ // write the data to output files
+ for i := 0; i < TotalShardsCount; i++ {
+ if !shardHasData[i] {
+ n, _ := outputFiles[i].WriteAt(buffers[i][:inputBufferDataSize], startOffset)
+ if inputBufferDataSize != n {
+ return fmt.Errorf("fail to write to %s", outputFiles[i].Name())
+ }
+ }
+ }
+ startOffset += int64(inputBufferDataSize)
+ }
+
+}
+
func readCompactMap(baseFileName string) (*needle_map.CompactMap, error) {
indexFile, err := os.OpenFile(baseFileName+".idx", os.O_RDONLY, 0644)
if err != nil {