aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/erasure_coding/ec_volume.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/erasure_coding/ec_volume.go')
-rw-r--r--weed/storage/erasure_coding/ec_volume.go38
1 files changed, 32 insertions, 6 deletions
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go
index aea53f36e..acc9b1c37 100644
--- a/weed/storage/erasure_coding/ec_volume.go
+++ b/weed/storage/erasure_coding/ec_volume.go
@@ -1,6 +1,7 @@
package erasure_coding
import (
+ "errors"
"fmt"
"math"
"os"
@@ -14,6 +15,10 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
+var (
+ NotFoundError = errors.New("needle not found")
+)
+
type EcVolume struct {
VolumeId needle.VolumeId
Collection string
@@ -26,6 +31,8 @@ type EcVolume struct {
ShardLocationsRefreshTime time.Time
ShardLocationsLock sync.RWMutex
Version needle.Version
+ ecjFile *os.File
+ ecjFileAccessLock sync.Mutex
}
func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
@@ -34,8 +41,8 @@ func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolu
baseFileName := EcShardFileName(collection, dir, int(vid))
// open ecx file
- if ev.ecxFile, err = os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644); err != nil {
- return nil, fmt.Errorf("cannot read ec volume index %s.ecx: %v", baseFileName, err)
+ if ev.ecxFile, err = os.OpenFile(baseFileName+".ecx", os.O_RDWR, 0644); err != nil {
+ return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", baseFileName, err)
}
ecxFi, statErr := ev.ecxFile.Stat()
if statErr != nil {
@@ -44,6 +51,11 @@ func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolu
ev.ecxFileSize = ecxFi.Size()
ev.ecxCreatedAt = ecxFi.ModTime()
+ // open ecj file
+ if ev.ecjFile, err = os.OpenFile(baseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil {
+ return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", baseFileName, err)
+ }
+
ev.ShardLocations = make(map[ShardId][]string)
return
@@ -93,6 +105,12 @@ func (ev *EcVolume) Close() {
for _, s := range ev.Shards {
s.Close()
}
+ if ev.ecjFile != nil {
+ ev.ecjFileAccessLock.Lock()
+ _ = ev.ecjFile.Close()
+ ev.ecjFile = nil
+ ev.ecjFileAccessLock.Unlock()
+ }
if ev.ecxFile != nil {
_ = ev.ecxFile.Close()
ev.ecxFile = nil
@@ -107,6 +125,7 @@ func (ev *EcVolume) Destroy() {
s.Destroy()
}
os.Remove(ev.FileName() + ".ecx")
+ os.Remove(ev.FileName() + ".ecj")
}
func (ev *EcVolume) FileName() string {
@@ -167,16 +186,23 @@ func (ev *EcVolume) LocateEcShardNeedle(n *needle.Needle, version needle.Version
}
func (ev *EcVolume) findNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) {
+ return searchNeedleFromEcx(ev.ecxFile, ev.ecxFileSize, needleId, nil)
+}
+
+func searchNeedleFromEcx(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size uint32, err error) {
var key types.NeedleId
buf := make([]byte, types.NeedleMapEntrySize)
- l, h := int64(0), ev.ecxFileSize/types.NeedleMapEntrySize
+ l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize
for l < h {
m := (l + h) / 2
- if _, err := ev.ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
- return types.Offset{}, 0, fmt.Errorf("ecx file %d read at %d: %v", ev.ecxFileSize, m*types.NeedleMapEntrySize, err)
+ if _, err := ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
+ return types.Offset{}, types.TombstoneFileSize, fmt.Errorf("ecx file %d read at %d: %v", ecxFileSize, m*types.NeedleMapEntrySize, err)
}
key, offset, size = idx.IdxFileEntry(buf)
if key == needleId {
+ if processNeedleFn != nil {
+ err = processNeedleFn(ecxFile, m*types.NeedleHeaderSize)
+ }
return
}
if key < needleId {
@@ -186,6 +212,6 @@ func (ev *EcVolume) findNeedleFromEcx(needleId types.NeedleId) (offset types.Off
}
}
- err = fmt.Errorf("needle id %d not found", needleId)
+ err = NotFoundError
return
}