diff options
Diffstat (limited to 'weed/storage/erasure_coding/ec_volume.go')
| -rw-r--r-- | weed/storage/erasure_coding/ec_volume.go | 38 |
1 files changed, 32 insertions, 6 deletions
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index aea53f36e..acc9b1c37 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -1,6 +1,7 @@ package erasure_coding import ( + "errors" "fmt" "math" "os" @@ -14,6 +15,10 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/types" ) +var ( + NotFoundError = errors.New("needle not found") +) + type EcVolume struct { VolumeId needle.VolumeId Collection string @@ -26,6 +31,8 @@ type EcVolume struct { ShardLocationsRefreshTime time.Time ShardLocationsLock sync.RWMutex Version needle.Version + ecjFile *os.File + ecjFileAccessLock sync.Mutex } func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) { @@ -34,8 +41,8 @@ func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolu baseFileName := EcShardFileName(collection, dir, int(vid)) // open ecx file - if ev.ecxFile, err = os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644); err != nil { - return nil, fmt.Errorf("cannot read ec volume index %s.ecx: %v", baseFileName, err) + if ev.ecxFile, err = os.OpenFile(baseFileName+".ecx", os.O_RDWR, 0644); err != nil { + return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", baseFileName, err) } ecxFi, statErr := ev.ecxFile.Stat() if statErr != nil { @@ -44,6 +51,11 @@ func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolu ev.ecxFileSize = ecxFi.Size() ev.ecxCreatedAt = ecxFi.ModTime() + // open ecj file + if ev.ecjFile, err = os.OpenFile(baseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil { + return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", baseFileName, err) + } + ev.ShardLocations = make(map[ShardId][]string) return @@ -93,6 +105,12 @@ func (ev *EcVolume) Close() { for _, s := range ev.Shards { s.Close() } + if ev.ecjFile != nil { + ev.ecjFileAccessLock.Lock() + _ = ev.ecjFile.Close() + ev.ecjFile = nil + ev.ecjFileAccessLock.Unlock() + } if ev.ecxFile != nil { _ = ev.ecxFile.Close() ev.ecxFile = nil @@ -107,6 +125,7 @@ func (ev *EcVolume) Destroy() { s.Destroy() } os.Remove(ev.FileName() + ".ecx") + os.Remove(ev.FileName() + ".ecj") } func (ev *EcVolume) FileName() string { @@ -167,16 +186,23 @@ func (ev *EcVolume) LocateEcShardNeedle(n *needle.Needle, version needle.Version } func (ev *EcVolume) findNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) { + return searchNeedleFromEcx(ev.ecxFile, ev.ecxFileSize, needleId, nil) +} + +func searchNeedleFromEcx(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size uint32, err error) { var key types.NeedleId buf := make([]byte, types.NeedleMapEntrySize) - l, h := int64(0), ev.ecxFileSize/types.NeedleMapEntrySize + l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize for l < h { m := (l + h) / 2 - if _, err := ev.ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil { - return types.Offset{}, 0, fmt.Errorf("ecx file %d read at %d: %v", ev.ecxFileSize, m*types.NeedleMapEntrySize, err) + if _, err := ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil { + return types.Offset{}, types.TombstoneFileSize, fmt.Errorf("ecx file %d read at %d: %v", ecxFileSize, m*types.NeedleMapEntrySize, err) } key, offset, size = idx.IdxFileEntry(buf) if key == needleId { + if processNeedleFn != nil { + err = processNeedleFn(ecxFile, m*types.NeedleHeaderSize) + } return } if key < needleId { @@ -186,6 +212,6 @@ func (ev *EcVolume) findNeedleFromEcx(needleId types.NeedleId) (offset types.Off } } - err = fmt.Errorf("needle id %d not found", needleId) + err = NotFoundError return } |
