aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-05-27 22:00:36 -0700
committerChris Lu <chris.lu@gmail.com>2019-05-27 22:00:36 -0700
commit3a8c1055a24094d22bdc33bdd01435097a6268a9 (patch)
treee18b897e530459e5bc94958b95cff5622ebc1672
parent217cde0a3bdb676e4a68959ca2aec0e7c82e2f7d (diff)
downloadseaweedfs-3a8c1055a24094d22bdc33bdd01435097a6268a9.tar.xz
seaweedfs-3a8c1055a24094d22bdc33bdd01435097a6268a9.zip
refactoring ecx to ecVolume
-rw-r--r--weed/storage/disk_location_ec.go11
-rw-r--r--weed/storage/erasure_coding/ec_shard.go42
-rw-r--r--weed/storage/erasure_coding/ec_volume.go63
3 files changed, 69 insertions, 47 deletions
diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go
index 2aec1502b..bcc09a1c0 100644
--- a/weed/storage/disk_location_ec.go
+++ b/weed/storage/disk_location_ec.go
@@ -50,7 +50,15 @@ func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shard
return fmt.Errorf("failed to create ec shard %d.%d: %v", vid, shardId, err)
}
l.ecVolumesLock.Lock()
- l.ecVolumes[vid].AddEcVolumeShard(ecVolumeShard)
+ ecVolume, found := l.ecVolumes[vid]
+ if !found {
+ ecVolume, err = erasure_coding.NewEcVolume(l.Directory, collection, vid)
+ if err != nil {
+ return fmt.Errorf("failed to create ec volume %d: %v", vid, err)
+ }
+ l.ecVolumes[vid] = ecVolume
+ }
+ ecVolume.AddEcVolumeShard(ecVolumeShard)
l.ecVolumesLock.Unlock()
return nil
@@ -69,6 +77,7 @@ func (l *DiskLocation) UnloadEcShard(vid needle.VolumeId, shardId erasure_coding
if len(ecVolume.Shards) == 0 {
delete(l.ecVolumes, vid)
}
+ ecVolume.Close()
return true
}
diff --git a/weed/storage/erasure_coding/ec_shard.go b/weed/storage/erasure_coding/ec_shard.go
index fa4bfcecd..037c4ba48 100644
--- a/weed/storage/erasure_coding/ec_shard.go
+++ b/weed/storage/erasure_coding/ec_shard.go
@@ -6,9 +6,7 @@ import (
"path"
"strconv"
- "github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
- "github.com/chrislusf/seaweedfs/weed/storage/types"
)
type ShardId uint8
@@ -20,8 +18,6 @@ type EcVolumeShard struct {
dir string
ecdFile *os.File
ecdFileSize int64
- ecxFile *os.File
- ecxFileSize int64
}
func NewEcVolumeShard(dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) {
@@ -30,16 +26,6 @@ func NewEcVolumeShard(dirname string, collection string, id needle.VolumeId, sha
baseFileName := v.FileName()
- // open ecx file
- if v.ecxFile, e = os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644); e != nil {
- return nil, fmt.Errorf("cannot read ec volume index %s.ecx: %v", baseFileName, e)
- }
- ecxFi, statErr := v.ecxFile.Stat()
- if statErr != nil {
- return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", baseFileName, statErr)
- }
- v.ecxFileSize = ecxFi.Size()
-
// open ecd file
if v.ecdFile, e = os.OpenFile(baseFileName+ToExt(int(shardId)), os.O_RDONLY, 0644); e != nil {
return nil, fmt.Errorf("cannot read ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), e)
@@ -76,34 +62,6 @@ func (shard *EcVolumeShard) Close() {
_ = shard.ecdFile.Close()
shard.ecdFile = nil
}
- if shard.ecxFile != nil {
- _ = shard.ecxFile.Close()
- shard.ecxFile = nil
- }
-}
-
-func (shard *EcVolumeShard) findNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) {
- var key types.NeedleId
- buf := make([]byte, types.NeedleMapEntrySize)
- l, h := int64(0), shard.ecxFileSize/types.NeedleMapEntrySize
- for l < h {
- m := (l + h) / 2
- if _, err := shard.ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
- return types.Offset{}, 0, err
- }
- key, offset, size = idx.IdxFileEntry(buf)
- if key == needleId {
- return
- }
- if key < needleId {
- l = m + 1
- } else {
- h = m
- }
- }
-
- err = fmt.Errorf("needle id %d not found", needleId)
- return
}
func (shard *EcVolumeShard) ReadAt(buf []byte, offset int64) (int, error) {
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go
index 5fba31f95..b4432c884 100644
--- a/weed/storage/erasure_coding/ec_volume.go
+++ b/weed/storage/erasure_coding/ec_volume.go
@@ -1,16 +1,42 @@
package erasure_coding
import (
+ "fmt"
"math"
+ "os"
"sort"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
type EcVolume struct {
- Shards []*EcVolumeShard
+ Shards []*EcVolumeShard
+ VolumeId needle.VolumeId
+ Collection string
+ dir string
+ ecxFile *os.File
+ ecxFileSize int64
+}
+
+func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
+ ev = &EcVolume{dir: dir, Collection: collection, VolumeId: vid}
+
+ baseFileName := EcShardFileName(collection, dir, int(vid))
+
+ // open ecx file
+ if ev.ecxFile, err = os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644); err != nil {
+ return nil, fmt.Errorf("cannot read ec volume index %s.ecx: %v", baseFileName, err)
+ }
+ ecxFi, statErr := ev.ecxFile.Stat()
+ if statErr != nil {
+ return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", baseFileName, statErr)
+ }
+ ev.ecxFileSize = ecxFi.Size()
+
+ return
}
func (ev *EcVolume) AddEcVolumeShard(ecVolumeShard *EcVolumeShard) bool {
@@ -55,6 +81,10 @@ func (ev *EcVolume) Close() {
for _, s := range ev.Shards {
s.Close()
}
+ if ev.ecxFile != nil {
+ _ = ev.ecxFile.Close()
+ ev.ecxFile = nil
+ }
}
func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.VolumeEcShardInformationMessage) {
@@ -76,15 +106,40 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.V
func (ev *EcVolume) LocateEcShardNeedle(n *needle.Needle) (offset types.Offset, size uint32, intervals []Interval, err error) {
- shard := ev.Shards[0]
// find the needle from ecx file
- offset, size, err = shard.findNeedleFromEcx(n.Id)
+ offset, size, err = ev.findNeedleFromEcx(n.Id)
if err != nil {
return types.Offset{}, 0, nil, err
}
+ shard := ev.Shards[0]
+
// calculate the locations in the ec shards
- intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, shard.ecxFileSize, offset.ToAcutalOffset(), size)
+ intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToAcutalOffset(), size)
+
+ return
+}
+
+func (ev *EcVolume) findNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) {
+ var key types.NeedleId
+ buf := make([]byte, types.NeedleMapEntrySize)
+ l, h := int64(0), ev.ecxFileSize/types.NeedleMapEntrySize
+ for l < h {
+ m := (l + h) / 2
+ if _, err := ev.ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
+ return types.Offset{}, 0, err
+ }
+ key, offset, size = idx.IdxFileEntry(buf)
+ if key == needleId {
+ return
+ }
+ if key < needleId {
+ l = m + 1
+ } else {
+ h = m
+ }
+ }
+ err = fmt.Errorf("needle id %d not found", needleId)
return
}