diff options
| author | Chris Lu <chris.lu@gmail.com> | 2019-05-27 11:59:03 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2019-05-27 11:59:03 -0700 |
| commit | b4b407e4038943ca5b7dc440d2848f23c11b73ca (patch) | |
| tree | f90a49fa2cac3361efa224b13c0ecfaade054b76 /weed/server | |
| parent | a4f3d82c57bca13321dca257891836ff36c7eca5 (diff) | |
| download | seaweedfs-b4b407e4038943ca5b7dc440d2848f23c11b73ca.tar.xz seaweedfs-b4b407e4038943ca5b7dc440d2848f23c11b73ca.zip | |
add grpc ec shard read
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/volume_grpc_copy.go | 5 | ||||
| -rw-r--r-- | weed/server/volume_grpc_erasure_coding.go | 59 | ||||
| -rw-r--r-- | weed/server/volume_grpc_tail.go | 5 |
3 files changed, 58 insertions, 11 deletions
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 7b681aa53..e5a3d6edf 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -16,6 +16,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) +const BufferSizeLimit = 1024 * 1024 * 2 + // VolumeCopy copy the .idx .dat files, and mount the volume func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.VolumeCopyRequest) (*volume_server_pb.VolumeCopyResponse, error) { @@ -190,7 +192,6 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v bytesToRead := int64(req.StopOffset) - const BufferSize = 1024 * 1024 * 2 var fileName = v.FileName() + req.Ext file, err := os.Open(fileName) if err != nil { @@ -198,7 +199,7 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v } defer file.Close() - buffer := make([]byte, BufferSize) + buffer := make([]byte, BufferSizeLimit) for bytesToRead > 0 { bytesread, err := file.Read(buffer) diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index f82b07e29..aa0f80442 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "io" "math" "os" @@ -71,8 +72,8 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv } // copy ec data slices - for _, ecIndex := range req.EcIndexes { - if err := vs.doCopyFile(ctx, client, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(ecIndex))); err != nil { + for _, shardId := range req.ShardIds { + if err := vs.doCopyFile(ctx, client, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId))); err != nil { return err } } @@ -95,8 +96,8 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se } baseFileName := v.FileName() - for _, shardIndex := range req.EcIndexes { - if err := os.Remove(baseFileName + erasure_coding.ToExt(int(shardIndex))); err != nil { + for _, shardId := range req.ShardIds { + if err := os.Remove(baseFileName + erasure_coding.ToExt(int(shardId))); err != nil { return nil, err } } @@ -112,7 +113,7 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) { - for _, shardId := range req.EcIndexes { + for _, shardId := range req.ShardIds { err := vs.store.MountEcShards(req.Collection, needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId)) if err != nil { @@ -131,7 +132,7 @@ func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_ser func (vs *VolumeServer) VolumeEcShardsUnmount(ctx context.Context, req *volume_server_pb.VolumeEcShardsUnmountRequest) (*volume_server_pb.VolumeEcShardsUnmountResponse, error) { - for _, shardId := range req.EcIndexes { + for _, shardId := range req.ShardIds { err := vs.store.UnmountEcShards(needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId)) if err != nil { @@ -147,3 +148,49 @@ func (vs *VolumeServer) VolumeEcShardsUnmount(ctx context.Context, req *volume_s return &volume_server_pb.VolumeEcShardsUnmountResponse{}, nil } + +func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardReadRequest, stream volume_server_pb.VolumeServer_VolumeEcShardReadServer) error { + + ecShards, found := vs.store.HasEcShard(needle.VolumeId(req.VolumeId)) + if !found { + return fmt.Errorf("not found ec volume id %d", req.VolumeId) + } + ecShard, found := ecShards.FindEcVolumeShard(erasure_coding.ShardId(req.ShardId)) + if !found { + return fmt.Errorf("not found ec shard %d.%d", req.VolumeId, req.ShardId) + } + + buffer := make([]byte, BufferSizeLimit) + startOffset, bytesToRead := req.Offset, req.Size + + for bytesToRead > 0 { + bytesread, err := ecShard.ReadAt(buffer, startOffset) + + // println(fileName, "read", bytesread, "bytes, with target", bytesToRead) + + if err != nil { + if err != io.EOF { + return err + } + // println(fileName, "read", bytesread, "bytes, with target", bytesToRead, "err", err.Error()) + break + } + + if int64(bytesread) > bytesToRead { + bytesread = int(bytesToRead) + } + err = stream.Send(&volume_server_pb.VolumeEcShardReadResponse{ + Data: buffer[:bytesread], + }) + if err != nil { + // println("sending", bytesread, "bytes err", err.Error()) + return err + } + + bytesToRead -= int64(bytesread) + + } + + return nil + +} diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go index 667131e9f..698bad5b8 100644 --- a/weed/server/volume_grpc_tail.go +++ b/weed/server/volume_grpc_tail.go @@ -69,12 +69,11 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServe err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error { - blockSizeLimit := 1024 * 1024 * 2 isLastChunk := false // need to send body by chunks - for i := 0; i < len(needleBody); i += blockSizeLimit { - stopOffset := i + blockSizeLimit + for i := 0; i < len(needleBody); i += BufferSizeLimit { + stopOffset := i + BufferSizeLimit if stopOffset >= len(needleBody) { isLastChunk = true stopOffset = len(needleBody) |
