aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/erasure_coding/ec_volume.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/erasure_coding/ec_volume.go')
-rw-r--r--weed/storage/erasure_coding/ec_volume.go62
1 files changed, 42 insertions, 20 deletions
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go
index 579f037fb..171db92a4 100644
--- a/weed/storage/erasure_coding/ec_volume.go
+++ b/weed/storage/erasure_coding/ec_volume.go
@@ -25,6 +25,7 @@ type EcVolume struct {
VolumeId needle.VolumeId
Collection string
dir string
+ dirIdx string
ecxFile *os.File
ecxFileSize int64
ecxCreatedAt time.Time
@@ -35,35 +36,37 @@ type EcVolume struct {
Version needle.Version
ecjFile *os.File
ecjFileAccessLock sync.Mutex
+ diskType types.DiskType
}
-func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
- ev = &EcVolume{dir: dir, Collection: collection, VolumeId: vid}
+func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
+ ev = &EcVolume{dir: dir, dirIdx: dirIdx, Collection: collection, VolumeId: vid, diskType: diskType}
- baseFileName := EcShardFileName(collection, dir, int(vid))
+ dataBaseFileName := EcShardFileName(collection, dir, int(vid))
+ indexBaseFileName := EcShardFileName(collection, dirIdx, int(vid))
// open ecx file
- if ev.ecxFile, err = os.OpenFile(baseFileName+".ecx", os.O_RDWR, 0644); err != nil {
- return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", baseFileName, err)
+ if ev.ecxFile, err = os.OpenFile(indexBaseFileName+".ecx", os.O_RDWR, 0644); err != nil {
+ return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", indexBaseFileName, err)
}
ecxFi, statErr := ev.ecxFile.Stat()
if statErr != nil {
- return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", baseFileName, statErr)
+ return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", indexBaseFileName, statErr)
}
ev.ecxFileSize = ecxFi.Size()
ev.ecxCreatedAt = ecxFi.ModTime()
// open ecj file
- if ev.ecjFile, err = os.OpenFile(baseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil {
- return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", baseFileName, err)
+ if ev.ecjFile, err = os.OpenFile(indexBaseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil {
+ return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", indexBaseFileName, err)
}
// read volume info
ev.Version = needle.Version3
- if volumeInfo, found := pb.MaybeLoadVolumeInfo(baseFileName + ".vif"); found {
+ if volumeInfo, _, found, _ := pb.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found {
ev.Version = needle.Version(volumeInfo.Version)
} else {
- pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
+ pb.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
}
ev.ShardLocations = make(map[ShardId][]string)
@@ -134,24 +137,42 @@ func (ev *EcVolume) Destroy() {
for _, s := range ev.Shards {
s.Destroy()
}
- os.Remove(ev.FileName() + ".ecx")
- os.Remove(ev.FileName() + ".ecj")
- os.Remove(ev.FileName() + ".vif")
+ os.Remove(ev.FileName(".ecx"))
+ os.Remove(ev.FileName(".ecj"))
+ os.Remove(ev.FileName(".vif"))
}
-func (ev *EcVolume) FileName() string {
+func (ev *EcVolume) FileName(ext string) string {
+ switch ext {
+ case ".ecx", ".ecj":
+ return ev.IndexBaseFileName() + ext
+ }
+ // .vif
+ return ev.DataBaseFileName() + ext
+}
+func (ev *EcVolume) DataBaseFileName() string {
return EcShardFileName(ev.Collection, ev.dir, int(ev.VolumeId))
+}
+func (ev *EcVolume) IndexBaseFileName() string {
+ return EcShardFileName(ev.Collection, ev.dirIdx, int(ev.VolumeId))
}
-func (ev *EcVolume) ShardSize() int64 {
+func (ev *EcVolume) ShardSize() uint64 {
if len(ev.Shards) > 0 {
- return ev.Shards[0].Size()
+ return uint64(ev.Shards[0].Size())
}
return 0
}
+func (ev *EcVolume) Size() (size int64) {
+ for _, shard := range ev.Shards {
+ size += shard.Size()
+ }
+ return
+}
+
func (ev *EcVolume) CreatedAt() time.Time {
return ev.ecxCreatedAt
}
@@ -171,6 +192,7 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.V
m = &master_pb.VolumeEcShardInformationMessage{
Id: uint32(s.VolumeId),
Collection: s.Collection,
+ DiskType: string(ev.diskType),
}
messages = append(messages, m)
}
@@ -180,7 +202,7 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.V
return
}
-func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.Version) (offset types.Offset, size uint32, intervals []Interval, err error) {
+func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.Version) (offset types.Offset, size types.Size, intervals []Interval, err error) {
// find the needle from ecx file
offset, size, err = ev.FindNeedleFromEcx(needleId)
@@ -191,16 +213,16 @@ func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.
shard := ev.Shards[0]
// calculate the locations in the ec shards
- intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToAcutalOffset(), uint32(needle.GetActualSize(size, version)))
+ intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, version)))
return
}
-func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) {
+func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size types.Size, err error) {
return SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, nil)
}
-func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size uint32, err error) {
+func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size types.Size, err error) {
var key types.NeedleId
buf := make([]byte, types.NeedleMapEntrySize)
l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize