diff options
Diffstat (limited to 'weed/storage/store_ec.go')
| -rw-r--r-- | weed/storage/store_ec.go | 57 |
1 files changed, 55 insertions, 2 deletions
diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index ed7c6484b..7514339c9 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -89,9 +89,62 @@ func (s *Store) HasEcShard(vid needle.VolumeId) (erasure_coding.EcVolumeShards, func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, error) { for _, location := range s.Locations { - if ecShards, found := location.HasEcShard(vid); found { - return ecShards.ReadEcShardNeedle(n) + if localEcShards, found := location.HasEcShard(vid); found { + + offset, size, intervals, err := localEcShards.LocateEcShardNeedle(n) + if err != nil { + return 0, err + } + + bytes, err := s.ReadEcShardIntervals(vid, localEcShards, intervals) + if err != nil { + return 0, fmt.Errorf("ReadEcShardIntervals: %v", err) + } + + version := needle.CurrentVersion + + err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, version) + if err != nil { + return 0, fmt.Errorf("readbytes: %v", err) + } + + return len(bytes), nil } } return 0, fmt.Errorf("ec shard %d not found", vid) } + +func (s *Store) ReadEcShardIntervals(vid needle.VolumeId, localEcShards erasure_coding.EcVolumeShards, intervals []erasure_coding.Interval) (data []byte, err error) { + for i, interval := range intervals { + if d, e := s.readOneEcShardInterval(vid, localEcShards, interval); e != nil { + return nil, e + } else { + if i == 0 { + data = d + } else { + data = append(data, d...) + } + } + } + return +} + +func (s *Store) readOneEcShardInterval(vid needle.VolumeId, localEcShards erasure_coding.EcVolumeShards, interval erasure_coding.Interval) (data []byte, err error) { + shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize) + data = make([]byte, interval.Size) + if shard, found := localEcShards.FindEcVolumeShard(shardId); found { + if _, err = shard.ReadAt(data, actualOffset); err != nil { + return + } + } else { + s.readOneRemoteEcShardInterval(vid, shardId, data, actualOffset) + } + return +} + +func (s *Store) readOneRemoteEcShardInterval(vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) { + + + + return +} |
