diff options
| author | Chris Lu <chris.lu@gmail.com> | 2019-06-03 02:26:31 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2019-06-03 02:26:31 -0700 |
| commit | 7e80b2b8823a9bb8bac58100a76d6a5825c94be4 (patch) | |
| tree | f406744ba5e7302157de46a59b4e5c09abff067f /weed/storage/erasure_coding/ec_encoder.go | |
| parent | 55be09996d8f82e461e1c464db82707c982b2b57 (diff) | |
| download | seaweedfs-7e80b2b8823a9bb8bac58100a76d6a5825c94be4.tar.xz seaweedfs-7e80b2b8823a9bb8bac58100a76d6a5825c94be4.zip | |
fix multiple bugs
Diffstat (limited to 'weed/storage/erasure_coding/ec_encoder.go')
| -rw-r--r-- | weed/storage/erasure_coding/ec_encoder.go | 105 |
1 files changed, 102 insertions, 3 deletions
diff --git a/weed/storage/erasure_coding/ec_encoder.go b/weed/storage/erasure_coding/ec_encoder.go index da0cfcde8..090e6e075 100644 --- a/weed/storage/erasure_coding/ec_encoder.go +++ b/weed/storage/erasure_coding/ec_encoder.go @@ -9,6 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/idx" "github.com/chrislusf/seaweedfs/weed/storage/needle_map" "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/klauspost/reedsolomon" ) @@ -53,6 +54,10 @@ func WriteEcFiles(baseFileName string) error { return generateEcFiles(baseFileName, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize) } +func RebuildEcFiles(baseFileName string) ([]uint32, error) { + return generateMissingEcFiles(baseFileName, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize) +} + func ToExt(ecIndex int) string { return fmt.Sprintf(".ec%02d", ecIndex) } @@ -75,6 +80,37 @@ func generateEcFiles(baseFileName string, bufferSize int, largeBlockSize int64, return nil } +func generateMissingEcFiles(baseFileName string, bufferSize int, largeBlockSize int64, smallBlockSize int64) (generatedShardIds []uint32, err error) { + + shardHasData := make([]bool, TotalShardsCount) + inputFiles := make([]*os.File, TotalShardsCount) + outputFiles := make([]*os.File, TotalShardsCount) + for shardId := 0; shardId < TotalShardsCount; shardId++ { + shardFileName := baseFileName + ToExt(shardId) + if util.FileExists(shardFileName) { + shardHasData[shardId] = true + inputFiles[shardId], err = os.OpenFile(shardFileName, os.O_RDONLY, 0) + if err != nil { + return nil, err + } + defer inputFiles[shardId].Close() + } else { + outputFiles[shardId], err = os.OpenFile(shardFileName, os.O_TRUNC|os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + return nil, err + } + defer outputFiles[shardId].Close() + generatedShardIds = append(generatedShardIds, uint32(shardId)) + } + } + + err = rebuildEcFiles(shardHasData, inputFiles, outputFiles) + if err != nil { + return nil, fmt.Errorf("rebuildEcFiles: %v", err) + } + return +} + func encodeData(file *os.File, enc reedsolomon.Encoder, startOffset, blockSize int64, buffers [][]byte, outputs []*os.File) error { bufferSize := int64(len(buffers[0])) @@ -150,20 +186,25 @@ func encodeDataOneBatch(file *os.File, enc reedsolomon.Encoder, startOffset, blo } func encodeDatFile(remainingSize int64, err error, baseFileName string, bufferSize int, largeBlockSize int64, file *os.File, smallBlockSize int64) error { + var processedSize int64 + enc, err := reedsolomon.New(DataShardsCount, ParityShardsCount) if err != nil { return fmt.Errorf("failed to create encoder: %v", err) } + buffers := make([][]byte, TotalShardsCount) + for i, _ := range buffers { + buffers[i] = make([]byte, bufferSize) + } + outputs, err := openEcFiles(baseFileName, false) defer closeEcFiles(outputs) if err != nil { return fmt.Errorf("failed to open dat file: %v", err) } - for i, _ := range buffers { - buffers[i] = make([]byte, bufferSize) - } + for remainingSize > largeBlockSize*DataShardsCount { err = encodeData(file, enc, processedSize, largeBlockSize, buffers, outputs) if err != nil { @@ -183,6 +224,64 @@ func encodeDatFile(remainingSize int64, err error, baseFileName string, bufferSi return nil } +func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*os.File) error { + + enc, err := reedsolomon.New(DataShardsCount, ParityShardsCount) + if err != nil { + return fmt.Errorf("failed to create encoder: %v", err) + } + + buffers := make([][]byte, TotalShardsCount) + for i, _ := range buffers { + if shardHasData[i] { + buffers[i] = make([]byte, ErasureCodingSmallBlockSize) + } + } + + var startOffset int64 + var inputBufferDataSize int + for { + + // read the input data from files + for i := 0; i < TotalShardsCount; i++ { + if shardHasData[i] { + n, _ := inputFiles[i].ReadAt(buffers[i], startOffset) + if n == 0 { + return nil + } + if inputBufferDataSize != 0 { + inputBufferDataSize = n + } + if inputBufferDataSize != n { + return fmt.Errorf("ec shard size need to be the same") + } + } else { + buffers[i] = nil + } + } + + fmt.Printf("reconstructing [%d,%d)\n", startOffset, startOffset+int64(inputBufferDataSize)) + + // encode the data + err = enc.Reconstruct(buffers) + if err != nil { + return fmt.Errorf("reconstruct: %v", err) + } + + // write the data to output files + for i := 0; i < TotalShardsCount; i++ { + if !shardHasData[i] { + n, _ := outputFiles[i].WriteAt(buffers[i][:inputBufferDataSize], startOffset) + if inputBufferDataSize != n { + return fmt.Errorf("fail to write to %s", outputFiles[i].Name()) + } + } + } + startOffset += int64(inputBufferDataSize) + } + +} + func readCompactMap(baseFileName string) (*needle_map.CompactMap, error) { indexFile, err := os.OpenFile(baseFileName+".idx", os.O_RDONLY, 0644) if err != nil { |
