aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-05-28 23:48:39 -0700
committerChris Lu <chris.lu@gmail.com>2019-05-28 23:48:39 -0700
commit5dd67f9acf51d3b1402d8a686747db0b19e45801 (patch)
treee0a1cc5832cc5bf43e6fd2f5c52fdc79f398cb1c
parent3f9ecee40fd469f9669686752ea8c6b2b8090596 (diff)
downloadseaweedfs-5dd67f9acf51d3b1402d8a686747db0b19e45801.tar.xz
seaweedfs-5dd67f9acf51d3b1402d8a686747db0b19e45801.zip
reading by recover from other shards
-rw-r--r--weed/storage/erasure_coding/ec_test.go2
-rw-r--r--weed/storage/store_ec.go88
2 files changed, 80 insertions, 10 deletions
diff --git a/weed/storage/erasure_coding/ec_test.go b/weed/storage/erasure_coding/ec_test.go
index e2e872dbe..602ea0bc0 100644
--- a/weed/storage/erasure_coding/ec_test.go
+++ b/weed/storage/erasure_coding/ec_test.go
@@ -122,7 +122,7 @@ func readEcFile(datSize int64, ecFiles []*os.File, offset types.Offset, size uin
func readOneInterval(interval Interval, ecFiles []*os.File) (data []byte, err error) {
- ecFileOffset, ecFileIndex := interval.ToShardIdAndOffset(largeBlockSize, smallBlockSize)
+ ecFileIndex, ecFileOffset := interval.ToShardIdAndOffset(largeBlockSize, smallBlockSize)
data = make([]byte, interval.Size)
err = readFromFile(ecFiles[ecFileIndex], data, ecFileOffset)
diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go
index ed531f206..db94e7b8b 100644
--- a/weed/storage/store_ec.go
+++ b/weed/storage/store_ec.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
+ "sync"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -12,6 +13,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/klauspost/reedsolomon"
)
func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat {
@@ -160,11 +162,19 @@ func (s *Store) readOneEcShardInterval(ctx context.Context, ecVolume *erasure_co
if !found || len(sourceDataNodes) == 0 {
return nil, fmt.Errorf("failed to find ec shard %d.%d", ecVolume.VolumeId, shardId)
}
- glog.V(4).Infof("read remote ec shard %d.%d from %s", ecVolume.VolumeId, shardId, sourceDataNodes[0])
- _, err = s.readOneRemoteEcShardInterval(ctx, sourceDataNodes[0], ecVolume.VolumeId, shardId, data, actualOffset)
- if err != nil {
- glog.V(1).Infof("failed to read from %s for ec shard %d.%d : %v", sourceDataNodes[0], ecVolume.VolumeId, shardId, err)
+
+ // try reading directly
+ _, err = s.readRemoteEcShardInterval(ctx, sourceDataNodes, ecVolume.VolumeId, shardId, data, actualOffset)
+ if err == nil {
+ return
+ }
+
+ // try reading by recovering from other shards
+ _, err = s.recoverOneRemoteEcShardInterval(ctx, ecVolume, shardId, data, actualOffset)
+ if err == nil {
+ return
}
+ glog.V(0).Infof("recover ec shard %d.%d : %v", ecVolume.VolumeId, shardId, err)
}
return
}
@@ -203,7 +213,21 @@ func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *eras
return
}
-func (s *Store) readOneRemoteEcShardInterval(ctx context.Context, sourceDataNode string, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
+func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes []string, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
+
+ for _, sourceDataNode := range sourceDataNodes {
+ glog.V(4).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode)
+ n, err = s.doReadRemoteEcShardInterval(ctx, sourceDataNode, vid, shardId, buf, offset)
+ if err == nil {
+ return
+ }
+ glog.V(1).Infof("read remote ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
+ }
+
+ return
+}
+
+func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode string, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
@@ -239,8 +263,54 @@ func (s *Store) readOneRemoteEcShardInterval(ctx context.Context, sourceDataNode
return
}
-func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, string, vid needle.VolumeId, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
- glog.V(1).Infof("recover ec shard %d.%d from other locations", vid, shardIdToRecover)
- // TODO add recovering
- return 0, fmt.Errorf("recover is not implemented yet")
+func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
+ glog.V(1).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
+
+ enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
+ if err != nil {
+ return 0, fmt.Errorf("failed to create encoder: %v", err)
+ }
+
+ bufs := make([][]byte, erasure_coding.TotalShardsCount)
+
+ var wg sync.WaitGroup
+ ecVolume.ShardLocationsLock.RLock()
+ for shardId, locations := range ecVolume.ShardLocations {
+
+ // skip currnent shard or empty shard
+ if shardId == shardIdToRecover {
+ continue
+ }
+ if len(locations) == 0 {
+ glog.V(3).Infof("readRemoteEcShardInterval missing %d.%d from %+v", ecVolume.VolumeId, shardId, locations)
+ continue
+ }
+
+ // read from remote locations
+ wg.Add(1)
+ go func(shardId erasure_coding.ShardId, locations []string) {
+ defer wg.Done()
+ data := make([]byte, len(buf))
+ n, err = s.readRemoteEcShardInterval(ctx, locations, ecVolume.VolumeId, shardId, data, offset)
+ if err != nil {
+ glog.V(3).Infof("readRemoteEcShardInterval %d.%d from %+v", ecVolume.VolumeId, shardId, locations)
+ }
+ if n == len(buf) {
+ bufs[shardId] = data
+ return
+ }
+ }(shardId, locations)
+ }
+ ecVolume.ShardLocationsLock.RUnlock()
+
+ wg.Wait()
+
+ if err = enc.ReconstructData(bufs); err != nil {
+ return 0, err
+ }
+ glog.V(3).Infof("recovered ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
+
+ copy(buf, bufs[shardIdToRecover])
+
+ return len(buf), nil
}