aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-05-27 01:29:46 -0700
committerChris Lu <chris.lu@gmail.com>2019-05-27 01:29:46 -0700
commita4f3d82c57bca13321dca257891836ff36c7eca5 (patch)
treef65ffa52266d7ecc0d867c7bcdc404202e59a0f4
parenta463759edfbfafa04b3ae85ccb59ee27a6045ad4 (diff)
downloadseaweedfs-a4f3d82c57bca13321dca257891836ff36c7eca5.tar.xz
seaweedfs-a4f3d82c57bca13321dca257891836ff36c7eca5.zip
convert needle id to ec intervals to read from
-rw-r--r--weed/server/volume_grpc_client_to_master.go3
-rw-r--r--weed/server/volume_server_handlers_read.go17
-rw-r--r--weed/storage/disk_location_ec.go11
-rw-r--r--weed/storage/erasure_coding/ec_locate.go27
-rw-r--r--weed/storage/erasure_coding/ec_shard.go107
-rw-r--r--weed/storage/erasure_coding/ec_test.go26
-rw-r--r--weed/storage/erasure_coding/ec_volume.go74
-rw-r--r--weed/storage/store.go23
-rw-r--r--weed/storage/store_ec.go32
9 files changed, 215 insertions, 105 deletions
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index 571c5716c..0fa61d71d 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -66,9 +66,6 @@ func (vs *VolumeServer) doHeartbeat(ctx context.Context, masterNode, masterGrpcA
glog.V(0).Infof("Heartbeat to: %v", masterNode)
vs.currentMaster = masterNode
- vs.store.Client = stream
- defer func() { vs.store.Client = nil }()
-
doneChan := make(chan error, 1)
go func() {
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index 816afcb8b..4af63e2ce 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -40,7 +40,9 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
glog.V(4).Infoln("volume", volumeId, "reading", n)
- if !vs.store.HasVolume(volumeId) {
+ hasVolume := vs.store.HasVolume(volumeId)
+ _, hasEcShard := vs.store.HasEcShard(volumeId)
+ if !hasVolume && !hasEcShard {
if !vs.ReadRedirect {
glog.V(2).Infoln("volume is not local:", err, r.URL.Path)
w.WriteHeader(http.StatusNotFound)
@@ -65,10 +67,15 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
}
cookie := n.Cookie
- count, e := vs.store.ReadVolumeNeedle(volumeId, n)
- glog.V(4).Infoln("read bytes", count, "error", e)
- if e != nil || count < 0 {
- glog.V(0).Infof("read %s error: %v", r.URL.Path, e)
+ var count int
+ if hasVolume {
+ count, err = vs.store.ReadVolumeNeedle(volumeId, n)
+ } else if hasEcShard {
+ count, err = vs.store.ReadEcShardNeedle(volumeId, n)
+ }
+ glog.V(4).Infoln("read bytes", count, "error", err)
+ if err != nil || count < 0 {
+ glog.V(0).Infof("read %s error: %v", r.URL.Path, err)
w.WriteHeader(http.StatusNotFound)
return
}
diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go
index 05acbb98b..e91c0f262 100644
--- a/weed/storage/disk_location_ec.go
+++ b/weed/storage/disk_location_ec.go
@@ -16,6 +16,17 @@ var (
re = regexp.MustCompile("\\.ec[0-9][0-9]")
)
+func (l *DiskLocation) HasEcShard(vid needle.VolumeId) (erasure_coding.EcVolumeShards, bool) {
+ l.ecShardsLock.RLock()
+ defer l.ecShardsLock.RUnlock()
+
+ ecShards, ok := l.ecShards[vid]
+ if ok {
+ return ecShards, true
+ }
+ return nil, false
+}
+
func (l *DiskLocation) FindEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool) {
l.ecShardsLock.RLock()
defer l.ecShardsLock.RUnlock()
diff --git a/weed/storage/erasure_coding/ec_locate.go b/weed/storage/erasure_coding/ec_locate.go
index b570f750c..ee8af3382 100644
--- a/weed/storage/erasure_coding/ec_locate.go
+++ b/weed/storage/erasure_coding/ec_locate.go
@@ -1,22 +1,25 @@
package erasure_coding
type Interval struct {
- blockIndex int
- innerBlockOffset int64
- size uint32
- isLargeBlock bool
+ BlockIndex int
+ InnerBlockOffset int64
+ Size uint32
+ IsLargeBlock bool
+ LargeBlockRowsCount int
}
-func locateData(largeBlockLength, smallBlockLength int64, datSize int64, offset int64, size uint32) (intervals []Interval) {
+func LocateData(largeBlockLength, smallBlockLength int64, datSize int64, offset int64, size uint32) (intervals []Interval) {
blockIndex, isLargeBlock, innerBlockOffset := locateOffset(largeBlockLength, smallBlockLength, datSize, offset)
- nLargeBlockRows := int(datSize / (largeBlockLength * DataShardsCount))
+ // adding DataShardsCount*smallBlockLength to ensure we can derive the number of large block size from a shard size
+ nLargeBlockRows := int((datSize + DataShardsCount*smallBlockLength) / (largeBlockLength * DataShardsCount))
for size > 0 {
interval := Interval{
- blockIndex: blockIndex,
- innerBlockOffset: innerBlockOffset,
- isLargeBlock: isLargeBlock,
+ BlockIndex: blockIndex,
+ InnerBlockOffset: innerBlockOffset,
+ IsLargeBlock: isLargeBlock,
+ LargeBlockRowsCount: nLargeBlockRows,
}
blockRemaining := largeBlockLength - innerBlockOffset
@@ -25,14 +28,14 @@ func locateData(largeBlockLength, smallBlockLength int64, datSize int64, offset
}
if int64(size) <= blockRemaining {
- interval.size = size
+ interval.Size = size
intervals = append(intervals, interval)
return
}
- interval.size = uint32(blockRemaining)
+ interval.Size = uint32(blockRemaining)
intervals = append(intervals, interval)
- size -= interval.size
+ size -= interval.Size
blockIndex += 1
if isLargeBlock && blockIndex == nLargeBlockRows*DataShardsCount {
isLargeBlock = false
diff --git a/weed/storage/erasure_coding/ec_shard.go b/weed/storage/erasure_coding/ec_shard.go
new file mode 100644
index 000000000..084d3f03b
--- /dev/null
+++ b/weed/storage/erasure_coding/ec_shard.go
@@ -0,0 +1,107 @@
+package erasure_coding
+
+import (
+ "fmt"
+ "os"
+ "path"
+ "strconv"
+
+ "github.com/chrislusf/seaweedfs/weed/storage/idx"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+type ShardId uint8
+
+type EcVolumeShard struct {
+ VolumeId needle.VolumeId
+ ShardId ShardId
+ Collection string
+ dir string
+ ecdFile *os.File
+ ecdFileSize int64
+ ecxFile *os.File
+ ecxFileSize int64
+}
+
+func NewEcVolumeShard(dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) {
+
+ v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId}
+
+ baseFileName := v.FileName()
+
+ // open ecx file
+ if v.ecxFile, e = os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644); e != nil {
+ return nil, fmt.Errorf("cannot read ec volume index %s.ecx: %v", baseFileName, e)
+ }
+ ecxFi, statErr := v.ecxFile.Stat()
+ if statErr != nil {
+ return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", baseFileName, statErr)
+ }
+ v.ecxFileSize = ecxFi.Size()
+
+ // open ecd file
+ if v.ecdFile, e = os.OpenFile(baseFileName+ToExt(int(shardId)), os.O_RDONLY, 0644); e != nil {
+ return nil, fmt.Errorf("cannot read ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), e)
+ }
+ ecdFi, statErr := v.ecdFile.Stat()
+ if statErr != nil {
+ return nil, fmt.Errorf("can not stat ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), statErr)
+ }
+ v.ecdFileSize = ecdFi.Size()
+
+ return
+}
+
+func (shard *EcVolumeShard) String() string {
+ return fmt.Sprintf("ec shard %v:%v, dir:%s, Collection:%s", shard.VolumeId, shard.ShardId, shard.dir, shard.Collection)
+}
+
+func (shard *EcVolumeShard) FileName() (fileName string) {
+ return EcShardFileName(shard.Collection, shard.dir, int(shard.VolumeId))
+}
+
+func EcShardFileName(collection string, dir string, id int) (fileName string) {
+ idString := strconv.Itoa(id)
+ if collection == "" {
+ fileName = path.Join(dir, idString)
+ } else {
+ fileName = path.Join(dir, collection+"_"+idString)
+ }
+ return
+}
+
+func (shard *EcVolumeShard) Close() {
+ if shard.ecdFile != nil {
+ _ = shard.ecdFile.Close()
+ shard.ecdFile = nil
+ }
+ if shard.ecxFile != nil {
+ _ = shard.ecxFile.Close()
+ shard.ecxFile = nil
+ }
+}
+
+func (shard *EcVolumeShard) findNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) {
+ var key types.NeedleId
+ buf := make([]byte, types.NeedleMapEntrySize)
+ l, h := int64(0), shard.ecxFileSize/types.NeedleMapEntrySize
+ for l < h {
+ m := (l + h) / 2
+ if _, err := shard.ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
+ return types.Offset{}, 0, err
+ }
+ key, offset, size = idx.IdxFileEntry(buf)
+ if key == needleId {
+ return
+ }
+ if key < needleId {
+ l = m + 1
+ } else {
+ h = m
+ }
+ }
+
+ err = fmt.Errorf("needle id %d not found", needleId)
+ return
+}
diff --git a/weed/storage/erasure_coding/ec_test.go b/weed/storage/erasure_coding/ec_test.go
index ecf73ac96..83b0bc23a 100644
--- a/weed/storage/erasure_coding/ec_test.go
+++ b/weed/storage/erasure_coding/ec_test.go
@@ -103,7 +103,7 @@ func readDatFile(datFile *os.File, offset types.Offset, size uint32) ([]byte, er
func readEcFile(datSize int64, ecFiles []*os.File, offset types.Offset, size uint32) (data []byte, err error) {
- intervals := locateData(largeBlockSize, smallBlockSize, datSize, offset.ToAcutalOffset(), size)
+ intervals := LocateData(largeBlockSize, smallBlockSize, datSize, offset.ToAcutalOffset(), size)
nLargeBlockRows := int(datSize / (largeBlockSize * DataShardsCount))
@@ -123,20 +123,20 @@ func readEcFile(datSize int64, ecFiles []*os.File, offset types.Offset, size uin
}
func readOneInterval(interval Interval, ecFiles []*os.File, nLargeBlockRows int) (data []byte, err error) {
- ecFileOffset := interval.innerBlockOffset
- rowIndex := interval.blockIndex / DataShardsCount
- if interval.isLargeBlock {
+ ecFileOffset := interval.InnerBlockOffset
+ rowIndex := interval.BlockIndex / DataShardsCount
+ if interval.IsLargeBlock {
ecFileOffset += int64(rowIndex) * largeBlockSize
} else {
ecFileOffset += int64(nLargeBlockRows)*largeBlockSize + int64(rowIndex)*smallBlockSize
}
- ecFileIndex := interval.blockIndex % DataShardsCount
+ ecFileIndex := interval.BlockIndex % DataShardsCount
- data = make([]byte, interval.size)
+ data = make([]byte, interval.Size)
err = readFromFile(ecFiles[ecFileIndex], data, ecFileOffset)
{ // do some ec testing
- ecData, err := readFromOtherEcFiles(ecFiles, ecFileIndex, ecFileOffset, interval.size)
+ ecData, err := readFromOtherEcFiles(ecFiles, ecFileIndex, ecFileOffset, interval.Size)
if err != nil {
return nil, fmt.Errorf("ec reconstruct error: %v", err)
}
@@ -194,7 +194,7 @@ func removeGeneratedFiles(baseFileName string) {
}
func TestLocateData(t *testing.T) {
- intervals := locateData(largeBlockSize, smallBlockSize, DataShardsCount*largeBlockSize+1, DataShardsCount*largeBlockSize, 1)
+ intervals := LocateData(largeBlockSize, smallBlockSize, DataShardsCount*largeBlockSize+1, DataShardsCount*largeBlockSize, 1)
if len(intervals) != 1 {
t.Errorf("unexpected interval size %d", len(intervals))
}
@@ -202,13 +202,13 @@ func TestLocateData(t *testing.T) {
t.Errorf("unexpected interval %+v", intervals[0])
}
- intervals = locateData(largeBlockSize, smallBlockSize, DataShardsCount*largeBlockSize+1, DataShardsCount*largeBlockSize/2+100, DataShardsCount*largeBlockSize+1-DataShardsCount*largeBlockSize/2-100)
+ intervals = LocateData(largeBlockSize, smallBlockSize, DataShardsCount*largeBlockSize+1, DataShardsCount*largeBlockSize/2+100, DataShardsCount*largeBlockSize+1-DataShardsCount*largeBlockSize/2-100)
fmt.Printf("%+v\n", intervals)
}
func (this Interval) sameAs(that Interval) bool {
- return this.isLargeBlock == that.isLargeBlock &&
- this.innerBlockOffset == that.innerBlockOffset &&
- this.blockIndex == that.blockIndex &&
- this.size == that.size
+ return this.IsLargeBlock == that.IsLargeBlock &&
+ this.InnerBlockOffset == that.InnerBlockOffset &&
+ this.BlockIndex == that.BlockIndex &&
+ this.Size == that.Size
}
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go
index 11dcd0860..d57a28449 100644
--- a/weed/storage/erasure_coding/ec_volume.go
+++ b/weed/storage/erasure_coding/ec_volume.go
@@ -1,44 +1,15 @@
package erasure_coding
import (
- "fmt"
"math"
- "os"
- "path"
"sort"
- "strconv"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
)
-type ShardId uint8
-
-type EcVolumeShard struct {
- VolumeId needle.VolumeId
- ShardId ShardId
- Collection string
- dir string
- ecdFile *os.File
- ecxFile *os.File
-}
type EcVolumeShards []*EcVolumeShard
-func NewEcVolumeShard(dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) {
-
- v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId}
-
- baseFileName := v.FileName()
- if v.ecxFile, e = os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644); e != nil {
- return nil, fmt.Errorf("cannot read ec volume index %s.ecx: %v", baseFileName, e)
- }
- if v.ecdFile, e = os.OpenFile(baseFileName+ToExt(int(shardId)), os.O_RDONLY, 0644); e != nil {
- return nil, fmt.Errorf("cannot read ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), e)
- }
-
- return
-}
-
func (shards *EcVolumeShards) AddEcVolumeShard(ecVolumeShard *EcVolumeShard) bool {
for _, s := range *shards {
if s.ShardId == ecVolumeShard.ShardId {
@@ -68,6 +39,15 @@ func (shards *EcVolumeShards) DeleteEcVolumeShard(ecVolumeShard *EcVolumeShard)
return true
}
+func (shards *EcVolumeShards) FindEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, found bool) {
+ for _, s := range *shards {
+ if s.ShardId == shardId {
+ return s, true
+ }
+ }
+ return nil, false
+}
+
func (shards *EcVolumeShards) Close() {
for _, s := range *shards {
s.Close()
@@ -91,31 +71,19 @@ func (shards *EcVolumeShards) ToVolumeEcShardInformationMessage() (messages []*m
return
}
-func (v *EcVolumeShard) String() string {
- return fmt.Sprintf("ec shard %v:%v, dir:%s, Collection:%s", v.VolumeId, v.ShardId, v.dir, v.Collection)
-}
-
-func (v *EcVolumeShard) FileName() (fileName string) {
- return EcShardFileName(v.Collection, v.dir, int(v.VolumeId))
-}
+func (shards *EcVolumeShards) ReadEcShardNeedle(n *needle.Needle) (int, error) {
-func EcShardFileName(collection string, dir string, id int) (fileName string) {
- idString := strconv.Itoa(id)
- if collection == "" {
- fileName = path.Join(dir, idString)
- } else {
- fileName = path.Join(dir, collection+"_"+idString)
+ shard := (*shards)[0]
+ // find the needle from ecx file
+ offset, size, err := shard.findNeedleFromEcx(n.Id)
+ if err != nil {
+ return 0, err
}
- return
-}
-func (v *EcVolumeShard) Close() {
- if v.ecdFile != nil {
- _ = v.ecdFile.Close()
- v.ecdFile = nil
- }
- if v.ecxFile != nil {
- _ = v.ecxFile.Close()
- v.ecxFile = nil
- }
+ // calculate the locations in the ec shards
+ intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, shard.ecxFileSize, offset.ToAcutalOffset(), size)
+
+ // TODO read the intervals
+
+ return len(intervals), nil
}
diff --git a/weed/storage/store.go b/weed/storage/store.go
index ad8f2d6b9..d5474d87f 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -18,18 +18,17 @@ const (
* A VolumeServer contains one Store
*/
type Store struct {
- volumeSizeLimit uint64 //read from the master
- Ip string
- Port int
- PublicUrl string
- Locations []*DiskLocation
- dataCenter string //optional informaton, overwriting master setting if exists
- rack string //optional information, overwriting master setting if exists
- connected bool
- Client master_pb.Seaweed_SendHeartbeatClient
- NeedleMapType NeedleMapType
- NewVolumesChan chan master_pb.VolumeShortInformationMessage
- DeletedVolumesChan chan master_pb.VolumeShortInformationMessage
+ volumeSizeLimit uint64 //read from the master
+ Ip string
+ Port int
+ PublicUrl string
+ Locations []*DiskLocation
+ dataCenter string //optional informaton, overwriting master setting if exists
+ rack string //optional information, overwriting master setting if exists
+ connected bool
+ NeedleMapType NeedleMapType
+ NewVolumesChan chan master_pb.VolumeShortInformationMessage
+ DeletedVolumesChan chan master_pb.VolumeShortInformationMessage
NewEcShardsChan chan master_pb.VolumeEcShardInformationMessage
DeletedEcShardsChan chan master_pb.VolumeEcShardInformationMessage
}
diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go
index ad935609d..ed7c6484b 100644
--- a/weed/storage/store_ec.go
+++ b/weed/storage/store_ec.go
@@ -33,9 +33,9 @@ func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId er
var shardBits erasure_coding.ShardBits
s.NewEcShardsChan <- master_pb.VolumeEcShardInformationMessage{
- Id: uint32(vid),
- Collection: collection,
- EcIndexBits: uint32(shardBits.AddShardId(shardId)),
+ Id: uint32(vid),
+ Collection: collection,
+ EcIndexBits: uint32(shardBits.AddShardId(shardId)),
}
return nil
}
@@ -53,9 +53,9 @@ func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.Shar
var shardBits erasure_coding.ShardBits
message := master_pb.VolumeEcShardInformationMessage{
- Id: uint32(vid),
- Collection: ecShard.Collection,
- EcIndexBits: uint32(shardBits.AddShardId(shardId)),
+ Id: uint32(vid),
+ Collection: ecShard.Collection,
+ EcIndexBits: uint32(shardBits.AddShardId(shardId)),
}
for _, location := range s.Locations {
@@ -69,7 +69,7 @@ func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.Shar
return fmt.Errorf("UnmountEcShards %d.%d not found on disk", vid, shardId)
}
-func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool) {
+func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool) {
for _, location := range s.Locations {
if v, found := location.FindEcShard(vid, shardId); found {
return v, found
@@ -77,3 +77,21 @@ func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId)
}
return nil, false
}
+
+func (s *Store) HasEcShard(vid needle.VolumeId) (erasure_coding.EcVolumeShards, bool) {
+ for _, location := range s.Locations {
+ if s, found := location.HasEcShard(vid); found {
+ return s, true
+ }
+ }
+ return nil, false
+}
+
+func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, error) {
+ for _, location := range s.Locations {
+ if ecShards, found := location.HasEcShard(vid); found {
+ return ecShards.ReadEcShardNeedle(n)
+ }
+ }
+ return 0, fmt.Errorf("ec shard %d not found", vid)
+}