aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-05-27 11:59:03 -0700
committerChris Lu <chris.lu@gmail.com>2019-05-27 11:59:03 -0700
commitb4b407e4038943ca5b7dc440d2848f23c11b73ca (patch)
treef90a49fa2cac3361efa224b13c0ecfaade054b76 /weed/server
parenta4f3d82c57bca13321dca257891836ff36c7eca5 (diff)
downloadseaweedfs-b4b407e4038943ca5b7dc440d2848f23c11b73ca.tar.xz
seaweedfs-b4b407e4038943ca5b7dc440d2848f23c11b73ca.zip
add grpc ec shard read
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/volume_grpc_copy.go5
-rw-r--r--weed/server/volume_grpc_erasure_coding.go59
-rw-r--r--weed/server/volume_grpc_tail.go5
3 files changed, 58 insertions, 11 deletions
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index 7b681aa53..e5a3d6edf 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -16,6 +16,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
+const BufferSizeLimit = 1024 * 1024 * 2
+
// VolumeCopy copy the .idx .dat files, and mount the volume
func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.VolumeCopyRequest) (*volume_server_pb.VolumeCopyResponse, error) {
@@ -190,7 +192,6 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
bytesToRead := int64(req.StopOffset)
- const BufferSize = 1024 * 1024 * 2
var fileName = v.FileName() + req.Ext
file, err := os.Open(fileName)
if err != nil {
@@ -198,7 +199,7 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
}
defer file.Close()
- buffer := make([]byte, BufferSize)
+ buffer := make([]byte, BufferSizeLimit)
for bytesToRead > 0 {
bytesread, err := file.Read(buffer)
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index f82b07e29..aa0f80442 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "io"
"math"
"os"
@@ -71,8 +72,8 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
}
// copy ec data slices
- for _, ecIndex := range req.EcIndexes {
- if err := vs.doCopyFile(ctx, client, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(ecIndex))); err != nil {
+ for _, shardId := range req.ShardIds {
+ if err := vs.doCopyFile(ctx, client, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId))); err != nil {
return err
}
}
@@ -95,8 +96,8 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
}
baseFileName := v.FileName()
- for _, shardIndex := range req.EcIndexes {
- if err := os.Remove(baseFileName + erasure_coding.ToExt(int(shardIndex))); err != nil {
+ for _, shardId := range req.ShardIds {
+ if err := os.Remove(baseFileName + erasure_coding.ToExt(int(shardId))); err != nil {
return nil, err
}
}
@@ -112,7 +113,7 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) {
- for _, shardId := range req.EcIndexes {
+ for _, shardId := range req.ShardIds {
err := vs.store.MountEcShards(req.Collection, needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
if err != nil {
@@ -131,7 +132,7 @@ func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_ser
func (vs *VolumeServer) VolumeEcShardsUnmount(ctx context.Context, req *volume_server_pb.VolumeEcShardsUnmountRequest) (*volume_server_pb.VolumeEcShardsUnmountResponse, error) {
- for _, shardId := range req.EcIndexes {
+ for _, shardId := range req.ShardIds {
err := vs.store.UnmountEcShards(needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
if err != nil {
@@ -147,3 +148,49 @@ func (vs *VolumeServer) VolumeEcShardsUnmount(ctx context.Context, req *volume_s
return &volume_server_pb.VolumeEcShardsUnmountResponse{}, nil
}
+
+func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardReadRequest, stream volume_server_pb.VolumeServer_VolumeEcShardReadServer) error {
+
+ ecShards, found := vs.store.HasEcShard(needle.VolumeId(req.VolumeId))
+ if !found {
+ return fmt.Errorf("not found ec volume id %d", req.VolumeId)
+ }
+ ecShard, found := ecShards.FindEcVolumeShard(erasure_coding.ShardId(req.ShardId))
+ if !found {
+ return fmt.Errorf("not found ec shard %d.%d", req.VolumeId, req.ShardId)
+ }
+
+ buffer := make([]byte, BufferSizeLimit)
+ startOffset, bytesToRead := req.Offset, req.Size
+
+ for bytesToRead > 0 {
+ bytesread, err := ecShard.ReadAt(buffer, startOffset)
+
+ // println(fileName, "read", bytesread, "bytes, with target", bytesToRead)
+
+ if err != nil {
+ if err != io.EOF {
+ return err
+ }
+ // println(fileName, "read", bytesread, "bytes, with target", bytesToRead, "err", err.Error())
+ break
+ }
+
+ if int64(bytesread) > bytesToRead {
+ bytesread = int(bytesToRead)
+ }
+ err = stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
+ Data: buffer[:bytesread],
+ })
+ if err != nil {
+ // println("sending", bytesread, "bytes err", err.Error())
+ return err
+ }
+
+ bytesToRead -= int64(bytesread)
+
+ }
+
+ return nil
+
+}
diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go
index 667131e9f..698bad5b8 100644
--- a/weed/server/volume_grpc_tail.go
+++ b/weed/server/volume_grpc_tail.go
@@ -69,12 +69,11 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServe
err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error {
- blockSizeLimit := 1024 * 1024 * 2
isLastChunk := false
// need to send body by chunks
- for i := 0; i < len(needleBody); i += blockSizeLimit {
- stopOffset := i + blockSizeLimit
+ for i := 0; i < len(needleBody); i += BufferSizeLimit {
+ stopOffset := i + BufferSizeLimit
if stopOffset >= len(needleBody) {
isLastChunk = true
stopOffset = len(needleBody)