diff options
| author | Chris Lu <chris.lu@gmail.com> | 2019-12-23 12:48:20 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2019-12-23 12:48:20 -0800 |
| commit | 09ca936c78c1044412f47dc06caff1bf08273e60 (patch) | |
| tree | c8402cc969ca53b841bfdcb6ae23d3955a20e5ec /weed/storage/erasure_coding/ec_decoder.go | |
| parent | dda5c6d3cb32550777f1baf5bd00794e10e56d84 (diff) | |
| download | seaweedfs-09ca936c78c1044412f47dc06caff1bf08273e60.tar.xz seaweedfs-09ca936c78c1044412f47dc06caff1bf08273e60.zip | |
shell: add ec.decode command
Diffstat (limited to 'weed/storage/erasure_coding/ec_decoder.go')
| -rw-r--r-- | weed/storage/erasure_coding/ec_decoder.go | 198 |
1 files changed, 198 insertions, 0 deletions
diff --git a/weed/storage/erasure_coding/ec_decoder.go b/weed/storage/erasure_coding/ec_decoder.go new file mode 100644 index 000000000..ae77cee3f --- /dev/null +++ b/weed/storage/erasure_coding/ec_decoder.go @@ -0,0 +1,198 @@ +package erasure_coding + +import ( + "fmt" + "io" + "os" + + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/idx" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/needle_map" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" + "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +// write .idx file from .ecx and .ecj files +func WriteIdxFileFromEcIndex(baseFileName string) (err error) { + + ecxFile, openErr := os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644) + if openErr != nil { + return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr) + } + defer ecxFile.Close() + + idxFile, openErr := os.OpenFile(baseFileName+".idx", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if openErr != nil { + return fmt.Errorf("cannot open %s.idx: %v", baseFileName, openErr) + } + defer idxFile.Close() + + io.Copy(idxFile, ecxFile) + + err = iterateEcjFile(baseFileName, func(key types.NeedleId) error { + + bytes := needle_map.ToBytes(key, types.Offset{}, types.TombstoneFileSize) + idxFile.Write(bytes) + + return nil + }) + + return err +} + +// FindDatFileSize calculate .dat file size from max offset entry +// there may be extra deletions after that entry +// but they are deletions anyway +func FindDatFileSize(baseFileName string) (datSize int64, err error) { + + version, err := readEcVolumeVersion(baseFileName) + if err != nil { + return 0, fmt.Errorf("read ec volume %s version: %v", baseFileName, err) + } + + err = iterateEcxFile(baseFileName, func(key types.NeedleId, offset types.Offset, size uint32) error { + + if size == types.TombstoneFileSize { + return nil + } + + entryStopOffset := offset.ToAcutalOffset() + needle.GetActualSize(size, version) + if datSize < entryStopOffset { + datSize = entryStopOffset + } + + return nil + }) + + return +} + +func readEcVolumeVersion(baseFileName string) (version needle.Version, err error) { + + // find volume version + datFile, err := os.OpenFile(baseFileName+".ec00", os.O_RDONLY, 0644) + if err != nil { + return 0, fmt.Errorf("open ec volume %s superblock: %v", baseFileName, err) + } + datBackend := backend.NewDiskFile(datFile) + + superBlock, err := super_block.ReadSuperBlock(datBackend) + datBackend.Close() + if err != nil { + return 0, fmt.Errorf("read ec volume %s superblock: %v", baseFileName, err) + } + + return superBlock.Version, nil + +} + +func iterateEcxFile(baseFileName string, processNeedleFn func(key types.NeedleId, offset types.Offset, size uint32) error) error { + ecxFile, openErr := os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644) + if openErr != nil { + return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr) + } + defer ecxFile.Close() + + buf := make([]byte, types.NeedleMapEntrySize) + for { + n, err := ecxFile.Read(buf) + if n != types.NeedleMapEntrySize { + if err == io.EOF { + return nil + } + return err + } + key, offset, size := idx.IdxFileEntry(buf) + if processNeedleFn != nil { + err = processNeedleFn(key, offset, size) + } + if err != nil { + if err != io.EOF { + return err + } + return nil + } + } + +} + +func iterateEcjFile(baseFileName string, processNeedleFn func(key types.NeedleId) error) error { + ecjFile, openErr := os.OpenFile(baseFileName+".ecj", os.O_RDONLY, 0644) + if openErr != nil { + return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr) + } + defer ecjFile.Close() + + buf := make([]byte, types.NeedleIdSize) + for { + n, err := ecjFile.Read(buf) + if n != types.NeedleIdSize { + if err == io.EOF { + return nil + } + return err + } + if processNeedleFn != nil { + err = processNeedleFn(types.BytesToNeedleId(buf)) + } + if err != nil { + if err == io.EOF { + return nil + } + return err + } + } + +} + +// WriteDatFile generates .dat from from .ec00 ~ .ec09 files +func WriteDatFile(baseFileName string, datFileSize int64) error { + + datFile, openErr := os.OpenFile(baseFileName+".dat", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if openErr != nil { + return fmt.Errorf("cannot write volume %s.dat: %v", baseFileName, openErr) + } + defer datFile.Close() + + inputFiles := make([]*os.File, DataShardsCount) + + for shardId := 0; shardId < DataShardsCount; shardId++ { + shardFileName := baseFileName + ToExt(shardId) + inputFiles[shardId], openErr = os.OpenFile(shardFileName, os.O_RDONLY, 0) + if openErr != nil { + return openErr + } + defer inputFiles[shardId].Close() + } + + for datFileSize >= DataShardsCount*ErasureCodingLargeBlockSize { + for shardId := 0; shardId < DataShardsCount; shardId++ { + w, err := io.CopyN(datFile, inputFiles[shardId], ErasureCodingLargeBlockSize) + if w != ErasureCodingLargeBlockSize { + return fmt.Errorf("copy %s large block %d: %v", baseFileName, shardId, err) + } + datFileSize -= ErasureCodingLargeBlockSize + } + } + + for datFileSize > 0 { + for shardId := 0; shardId < DataShardsCount; shardId++ { + toRead := min(datFileSize, ErasureCodingSmallBlockSize) + w, err := io.CopyN(datFile, inputFiles[shardId], toRead) + if w != toRead { + return fmt.Errorf("copy %s small block %d: %v", baseFileName, shardId, err) + } + datFileSize -= toRead + } + } + + return nil +} + +func min(x, y int64) int64 { + if x > y { + return y + } + return x +} |
