diff options
Diffstat (limited to 'weed/storage/store_ec.go')
| -rw-r--r-- | weed/storage/store_ec.go | 36 |
1 files changed, 18 insertions, 18 deletions
diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index 47e061d05..e423e7dca 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -116,7 +116,7 @@ func (s *Store) DestroyEcVolume(vid needle.VolumeId) { } } -func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *needle.Needle) (int, error) { +func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, error) { for _, location := range s.Locations { if localEcVolume, found := location.FindEcVolume(vid); found { @@ -133,7 +133,7 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n if len(intervals) > 1 { glog.V(3).Infof("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals) } - bytes, isDeleted, err := s.readEcShardIntervals(ctx, vid, n.Id, localEcVolume, intervals) + bytes, isDeleted, err := s.readEcShardIntervals(vid, n.Id, localEcVolume, intervals) if err != nil { return 0, fmt.Errorf("ReadEcShardIntervals: %v", err) } @@ -152,14 +152,14 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n return 0, fmt.Errorf("ec shard %d not found", vid) } -func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, is_deleted bool, err error) { +func (s *Store) readEcShardIntervals(vid needle.VolumeId, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, is_deleted bool, err error) { - if err = s.cachedLookupEcShardLocations(ctx, ecVolume); err != nil { + if err = s.cachedLookupEcShardLocations(ecVolume); err != nil { return nil, false, fmt.Errorf("failed to locate shard via master grpc %s: %v", s.MasterAddress, err) } for i, interval := range intervals { - if d, isDeleted, e := s.readOneEcShardInterval(ctx, needleId, ecVolume, interval); e != nil { + if d, isDeleted, e := s.readOneEcShardInterval(needleId, ecVolume, interval); e != nil { return nil, isDeleted, e } else { if isDeleted { @@ -175,7 +175,7 @@ func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, n return } -func (s *Store) readOneEcShardInterval(ctx context.Context, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, is_deleted bool, err error) { +func (s *Store) readOneEcShardInterval(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, is_deleted bool, err error) { shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize) data = make([]byte, interval.Size) if shard, found := ecVolume.FindEcVolumeShard(shardId); found { @@ -190,7 +190,7 @@ func (s *Store) readOneEcShardInterval(ctx context.Context, needleId types.Needl // try reading directly if hasShardIdLocation { - _, is_deleted, err = s.readRemoteEcShardInterval(ctx, sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset) + _, is_deleted, err = s.readRemoteEcShardInterval(sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset) if err == nil { return } @@ -199,7 +199,7 @@ func (s *Store) readOneEcShardInterval(ctx context.Context, needleId types.Needl } // try reading by recovering from other shards - _, is_deleted, err = s.recoverOneRemoteEcShardInterval(ctx, needleId, ecVolume, shardId, data, actualOffset) + _, is_deleted, err = s.recoverOneRemoteEcShardInterval(needleId, ecVolume, shardId, data, actualOffset) if err == nil { return } @@ -215,7 +215,7 @@ func forgetShardId(ecVolume *erasure_coding.EcVolume, shardId erasure_coding.Sha ecVolume.ShardLocationsLock.Unlock() } -func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *erasure_coding.EcVolume) (err error) { +func (s *Store) cachedLookupEcShardLocations(ecVolume *erasure_coding.EcVolume) (err error) { shardCount := len(ecVolume.ShardLocations) if shardCount < erasure_coding.DataShardsCount && @@ -230,11 +230,11 @@ func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *eras glog.V(3).Infof("lookup and cache ec volume %d locations", ecVolume.VolumeId) - err = operation.WithMasterServerClient(s.MasterAddress, s.grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error { + err = operation.WithMasterServerClient(s.MasterAddress, s.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.LookupEcVolumeRequest{ VolumeId: uint32(ecVolume.VolumeId), } - resp, err := masterClient.LookupEcVolume(ctx, req) + resp, err := masterClient.LookupEcVolume(context.Background(), req) if err != nil { return fmt.Errorf("lookup ec volume %d: %v", ecVolume.VolumeId, err) } @@ -258,7 +258,7 @@ func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *eras return } -func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes []string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { +func (s *Store) readRemoteEcShardInterval(sourceDataNodes []string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { if len(sourceDataNodes) == 0 { return 0, false, fmt.Errorf("failed to find ec shard %d.%d", vid, shardId) @@ -266,7 +266,7 @@ func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes [ for _, sourceDataNode := range sourceDataNodes { glog.V(3).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode) - n, is_deleted, err = s.doReadRemoteEcShardInterval(ctx, sourceDataNode, needleId, vid, shardId, buf, offset) + n, is_deleted, err = s.doReadRemoteEcShardInterval(sourceDataNode, needleId, vid, shardId, buf, offset) if err == nil { return } @@ -276,12 +276,12 @@ func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes [ return } -func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { +func (s *Store) doReadRemoteEcShardInterval(sourceDataNode string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { - err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { // copy data slice - shardReadClient, err := client.VolumeEcShardRead(ctx, &volume_server_pb.VolumeEcShardReadRequest{ + shardReadClient, err := client.VolumeEcShardRead(context.Background(), &volume_server_pb.VolumeEcShardReadRequest{ VolumeId: uint32(vid), ShardId: uint32(shardId), Offset: offset, @@ -316,7 +316,7 @@ func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode return } -func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { +func (s *Store) recoverOneRemoteEcShardInterval(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { glog.V(3).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover) enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) @@ -344,7 +344,7 @@ func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, needleId ty go func(shardId erasure_coding.ShardId, locations []string) { defer wg.Done() data := make([]byte, len(buf)) - nRead, isDeleted, readErr := s.readRemoteEcShardInterval(ctx, locations, needleId, ecVolume.VolumeId, shardId, data, offset) + nRead, isDeleted, readErr := s.readRemoteEcShardInterval(locations, needleId, ecVolume.VolumeId, shardId, data, offset) if readErr != nil { glog.V(3).Infof("recover: readRemoteEcShardInterval %d.%d %d bytes from %+v: %v", ecVolume.VolumeId, shardId, nRead, locations, readErr) forgetShardId(ecVolume, shardId) |
