diff options
Diffstat (limited to 'weed/storage/erasure_coding')
| -rw-r--r-- | weed/storage/erasure_coding/ec_encoder.go | 6 | ||||
| -rw-r--r-- | weed/storage/erasure_coding/ec_volume.go | 38 | ||||
| -rw-r--r-- | weed/storage/erasure_coding/ec_volume_delete.go | 98 |
3 files changed, 133 insertions, 9 deletions
diff --git a/weed/storage/erasure_coding/ec_encoder.go b/weed/storage/erasure_coding/ec_encoder.go index 26130b4ba..97010a1ed 100644 --- a/weed/storage/erasure_coding/ec_encoder.go +++ b/weed/storage/erasure_coding/ec_encoder.go @@ -32,7 +32,7 @@ func WriteSortedEcxFile(baseFileName string) (e error) { ecxFile, err := os.OpenFile(baseFileName+".ecx", os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { - return fmt.Errorf("failed to open dat file: %v", err) + return fmt.Errorf("failed to open ecx file: %v", err) } defer ecxFile.Close() @@ -43,7 +43,7 @@ func WriteSortedEcxFile(baseFileName string) (e error) { }) if err != nil { - return fmt.Errorf("failed to open dat file: %v", err) + return fmt.Errorf("failed to visit ecx file: %v", err) } return nil @@ -202,7 +202,7 @@ func encodeDatFile(remainingSize int64, err error, baseFileName string, bufferSi outputs, err := openEcFiles(baseFileName, false) defer closeEcFiles(outputs) if err != nil { - return fmt.Errorf("failed to open dat file: %v", err) + return fmt.Errorf("failed to open ec files %s: %v", baseFileName, err) } for remainingSize > largeBlockSize*DataShardsCount { diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index aea53f36e..acc9b1c37 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -1,6 +1,7 @@ package erasure_coding import ( + "errors" "fmt" "math" "os" @@ -14,6 +15,10 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/types" ) +var ( + NotFoundError = errors.New("needle not found") +) + type EcVolume struct { VolumeId needle.VolumeId Collection string @@ -26,6 +31,8 @@ type EcVolume struct { ShardLocationsRefreshTime time.Time ShardLocationsLock sync.RWMutex Version needle.Version + ecjFile *os.File + ecjFileAccessLock sync.Mutex } func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) { @@ -34,8 +41,8 @@ func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolu baseFileName := EcShardFileName(collection, dir, int(vid)) // open ecx file - if ev.ecxFile, err = os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644); err != nil { - return nil, fmt.Errorf("cannot read ec volume index %s.ecx: %v", baseFileName, err) + if ev.ecxFile, err = os.OpenFile(baseFileName+".ecx", os.O_RDWR, 0644); err != nil { + return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", baseFileName, err) } ecxFi, statErr := ev.ecxFile.Stat() if statErr != nil { @@ -44,6 +51,11 @@ func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolu ev.ecxFileSize = ecxFi.Size() ev.ecxCreatedAt = ecxFi.ModTime() + // open ecj file + if ev.ecjFile, err = os.OpenFile(baseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil { + return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", baseFileName, err) + } + ev.ShardLocations = make(map[ShardId][]string) return @@ -93,6 +105,12 @@ func (ev *EcVolume) Close() { for _, s := range ev.Shards { s.Close() } + if ev.ecjFile != nil { + ev.ecjFileAccessLock.Lock() + _ = ev.ecjFile.Close() + ev.ecjFile = nil + ev.ecjFileAccessLock.Unlock() + } if ev.ecxFile != nil { _ = ev.ecxFile.Close() ev.ecxFile = nil @@ -107,6 +125,7 @@ func (ev *EcVolume) Destroy() { s.Destroy() } os.Remove(ev.FileName() + ".ecx") + os.Remove(ev.FileName() + ".ecj") } func (ev *EcVolume) FileName() string { @@ -167,16 +186,23 @@ func (ev *EcVolume) LocateEcShardNeedle(n *needle.Needle, version needle.Version } func (ev *EcVolume) findNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) { + return searchNeedleFromEcx(ev.ecxFile, ev.ecxFileSize, needleId, nil) +} + +func searchNeedleFromEcx(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size uint32, err error) { var key types.NeedleId buf := make([]byte, types.NeedleMapEntrySize) - l, h := int64(0), ev.ecxFileSize/types.NeedleMapEntrySize + l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize for l < h { m := (l + h) / 2 - if _, err := ev.ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil { - return types.Offset{}, 0, fmt.Errorf("ecx file %d read at %d: %v", ev.ecxFileSize, m*types.NeedleMapEntrySize, err) + if _, err := ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil { + return types.Offset{}, types.TombstoneFileSize, fmt.Errorf("ecx file %d read at %d: %v", ecxFileSize, m*types.NeedleMapEntrySize, err) } key, offset, size = idx.IdxFileEntry(buf) if key == needleId { + if processNeedleFn != nil { + err = processNeedleFn(ecxFile, m*types.NeedleHeaderSize) + } return } if key < needleId { @@ -186,6 +212,6 @@ func (ev *EcVolume) findNeedleFromEcx(needleId types.NeedleId) (offset types.Off } } - err = fmt.Errorf("needle id %d not found", needleId) + err = NotFoundError return } diff --git a/weed/storage/erasure_coding/ec_volume_delete.go b/weed/storage/erasure_coding/ec_volume_delete.go new file mode 100644 index 000000000..784dc2854 --- /dev/null +++ b/weed/storage/erasure_coding/ec_volume_delete.go @@ -0,0 +1,98 @@ +package erasure_coding + +import ( + "fmt" + "io" + "os" + + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + markNeedleDeleted = func(file *os.File, offset int64) error { + b := make([]byte, types.SizeSize) + util.Uint32toBytes(b, types.TombstoneFileSize) + n, err := file.WriteAt(b, offset+types.NeedleIdSize+types.OffsetSize) + if err != nil { + return fmt.Errorf("ecx write error: %v", err) + } + if n != types.SizeSize { + return fmt.Errorf("ecx written %d bytes, expecting %d", n, types.SizeSize) + } + return nil + } +) + +func (ev *EcVolume) deleteNeedleFromEcx(needleId types.NeedleId) (err error) { + + _, _, err = searchNeedleFromEcx(ev.ecxFile, ev.ecxFileSize, needleId, markNeedleDeleted) + + if err != nil { + if err == NotFoundError { + return nil + } + return err + } + + b := make([]byte, types.NeedleIdSize) + types.NeedleIdToBytes(b, needleId) + + ev.ecjFileAccessLock.Lock() + + ev.ecjFile.Seek(0, io.SeekEnd) + ev.ecjFile.Write(b) + + ev.ecjFileAccessLock.Unlock() + + return +} + +func RebuildEcxFile(baseFileName string) error { + + if !util.FileExists(baseFileName + ".ecj") { + return nil + } + + ecxFile, err := os.OpenFile(baseFileName+".ecx", os.O_RDWR, 0644) + if err != nil { + return fmt.Errorf("rebuild: failed to open ecx file: %v", err) + } + defer ecxFile.Close() + + fstat, err := ecxFile.Stat() + if err != nil { + return err + } + + ecxFileSize := fstat.Size() + + ecjFile, err := os.OpenFile(baseFileName+".ecj", os.O_RDWR, 0644) + if err != nil { + return fmt.Errorf("rebuild: failed to open ecj file: %v", err) + } + + buf := make([]byte, types.NeedleIdSize) + for { + n, _ := ecjFile.Read(buf) + if n != types.NeedleIdSize { + break + } + + needleId := types.BytesToNeedleId(buf) + + _, _, err = searchNeedleFromEcx(ecxFile, ecxFileSize, needleId, markNeedleDeleted) + + if err != nil && err != NotFoundError { + ecxFile.Close() + return err + } + + } + + ecxFile.Close() + + os.Remove(baseFileName + ".ecj") + + return nil +} |
