aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/store_ec.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/store_ec.go')
-rw-r--r--weed/storage/store_ec.go63
1 files changed, 20 insertions, 43 deletions
diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go
index 7e3f1a46c..e423e7dca 100644
--- a/weed/storage/store_ec.go
+++ b/weed/storage/store_ec.go
@@ -8,6 +8,8 @@ import (
"sync"
"time"
+ "github.com/klauspost/reedsolomon"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -16,7 +18,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types"
- "github.com/klauspost/reedsolomon"
)
func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat {
@@ -115,19 +116,11 @@ func (s *Store) DestroyEcVolume(vid needle.VolumeId) {
}
}
-func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *needle.Needle) (int, error) {
+func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, error) {
for _, location := range s.Locations {
if localEcVolume, found := location.FindEcVolume(vid); found {
- // read the volume version
- for localEcVolume.Version == 0 {
- err := s.readEcVolumeVersion(ctx, vid, localEcVolume)
- time.Sleep(1357 * time.Millisecond)
- glog.V(0).Infof("ReadEcShardNeedle vid %d version:%v: %v", vid, localEcVolume.Version, err)
- }
- version := localEcVolume.Version
-
- offset, size, intervals, err := localEcVolume.LocateEcShardNeedle(n.Id, version)
+ offset, size, intervals, err := localEcVolume.LocateEcShardNeedle(n.Id, localEcVolume.Version)
if err != nil {
return 0, fmt.Errorf("locate in local ec volume: %v", err)
}
@@ -140,7 +133,7 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n
if len(intervals) > 1 {
glog.V(3).Infof("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals)
}
- bytes, isDeleted, err := s.readEcShardIntervals(ctx, vid, n.Id, localEcVolume, intervals)
+ bytes, isDeleted, err := s.readEcShardIntervals(vid, n.Id, localEcVolume, intervals)
if err != nil {
return 0, fmt.Errorf("ReadEcShardIntervals: %v", err)
}
@@ -148,7 +141,7 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n
return 0, fmt.Errorf("ec entry %s is deleted", n.Id)
}
- err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, version)
+ err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, localEcVolume.Version)
if err != nil {
return 0, fmt.Errorf("readbytes: %v", err)
}
@@ -159,30 +152,14 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n
return 0, fmt.Errorf("ec shard %d not found", vid)
}
-func (s *Store) readEcVolumeVersion(ctx context.Context, vid needle.VolumeId, ecVolume *erasure_coding.EcVolume) (err error) {
-
- interval := erasure_coding.Interval{
- BlockIndex: 0,
- InnerBlockOffset: 0,
- Size: _SuperBlockSize,
- IsLargeBlock: true, // it could be large block, but ok in this place
- LargeBlockRowsCount: 0,
- }
- data, _, err := s.readEcShardIntervals(ctx, vid, 0, ecVolume, []erasure_coding.Interval{interval})
- if err == nil {
- ecVolume.Version = needle.Version(data[0])
- }
- return
-}
-
-func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
+func (s *Store) readEcShardIntervals(vid needle.VolumeId, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
- if err = s.cachedLookupEcShardLocations(ctx, ecVolume); err != nil {
+ if err = s.cachedLookupEcShardLocations(ecVolume); err != nil {
return nil, false, fmt.Errorf("failed to locate shard via master grpc %s: %v", s.MasterAddress, err)
}
for i, interval := range intervals {
- if d, isDeleted, e := s.readOneEcShardInterval(ctx, needleId, ecVolume, interval); e != nil {
+ if d, isDeleted, e := s.readOneEcShardInterval(needleId, ecVolume, interval); e != nil {
return nil, isDeleted, e
} else {
if isDeleted {
@@ -198,7 +175,7 @@ func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, n
return
}
-func (s *Store) readOneEcShardInterval(ctx context.Context, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
+func (s *Store) readOneEcShardInterval(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
data = make([]byte, interval.Size)
if shard, found := ecVolume.FindEcVolumeShard(shardId); found {
@@ -213,7 +190,7 @@ func (s *Store) readOneEcShardInterval(ctx context.Context, needleId types.Needl
// try reading directly
if hasShardIdLocation {
- _, is_deleted, err = s.readRemoteEcShardInterval(ctx, sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset)
+ _, is_deleted, err = s.readRemoteEcShardInterval(sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset)
if err == nil {
return
}
@@ -222,7 +199,7 @@ func (s *Store) readOneEcShardInterval(ctx context.Context, needleId types.Needl
}
// try reading by recovering from other shards
- _, is_deleted, err = s.recoverOneRemoteEcShardInterval(ctx, needleId, ecVolume, shardId, data, actualOffset)
+ _, is_deleted, err = s.recoverOneRemoteEcShardInterval(needleId, ecVolume, shardId, data, actualOffset)
if err == nil {
return
}
@@ -238,7 +215,7 @@ func forgetShardId(ecVolume *erasure_coding.EcVolume, shardId erasure_coding.Sha
ecVolume.ShardLocationsLock.Unlock()
}
-func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *erasure_coding.EcVolume) (err error) {
+func (s *Store) cachedLookupEcShardLocations(ecVolume *erasure_coding.EcVolume) (err error) {
shardCount := len(ecVolume.ShardLocations)
if shardCount < erasure_coding.DataShardsCount &&
@@ -257,7 +234,7 @@ func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *eras
req := &master_pb.LookupEcVolumeRequest{
VolumeId: uint32(ecVolume.VolumeId),
}
- resp, err := masterClient.LookupEcVolume(ctx, req)
+ resp, err := masterClient.LookupEcVolume(context.Background(), req)
if err != nil {
return fmt.Errorf("lookup ec volume %d: %v", ecVolume.VolumeId, err)
}
@@ -281,7 +258,7 @@ func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *eras
return
}
-func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes []string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
+func (s *Store) readRemoteEcShardInterval(sourceDataNodes []string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
if len(sourceDataNodes) == 0 {
return 0, false, fmt.Errorf("failed to find ec shard %d.%d", vid, shardId)
@@ -289,7 +266,7 @@ func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes [
for _, sourceDataNode := range sourceDataNodes {
glog.V(3).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode)
- n, is_deleted, err = s.doReadRemoteEcShardInterval(ctx, sourceDataNode, needleId, vid, shardId, buf, offset)
+ n, is_deleted, err = s.doReadRemoteEcShardInterval(sourceDataNode, needleId, vid, shardId, buf, offset)
if err == nil {
return
}
@@ -299,12 +276,12 @@ func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes [
return
}
-func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
+func (s *Store) doReadRemoteEcShardInterval(sourceDataNode string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
// copy data slice
- shardReadClient, err := client.VolumeEcShardRead(ctx, &volume_server_pb.VolumeEcShardReadRequest{
+ shardReadClient, err := client.VolumeEcShardRead(context.Background(), &volume_server_pb.VolumeEcShardReadRequest{
VolumeId: uint32(vid),
ShardId: uint32(shardId),
Offset: offset,
@@ -339,7 +316,7 @@ func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode
return
}
-func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
+func (s *Store) recoverOneRemoteEcShardInterval(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
glog.V(3).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
@@ -367,7 +344,7 @@ func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, needleId ty
go func(shardId erasure_coding.ShardId, locations []string) {
defer wg.Done()
data := make([]byte, len(buf))
- nRead, isDeleted, readErr := s.readRemoteEcShardInterval(ctx, locations, needleId, ecVolume.VolumeId, shardId, data, offset)
+ nRead, isDeleted, readErr := s.readRemoteEcShardInterval(locations, needleId, ecVolume.VolumeId, shardId, data, offset)
if readErr != nil {
glog.V(3).Infof("recover: readRemoteEcShardInterval %d.%d %d bytes from %+v: %v", ecVolume.VolumeId, shardId, nRead, locations, readErr)
forgetShardId(ecVolume, shardId)