diff options
Diffstat (limited to 'weed/server/volume_grpc_tail.go')
| -rw-r--r-- | weed/server/volume_grpc_tail.go | 28 |
1 files changed, 24 insertions, 4 deletions
diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go index 87db6e146..16154c9cc 100644 --- a/weed/server/volume_grpc_tail.go +++ b/weed/server/volume_grpc_tail.go @@ -1,16 +1,18 @@ package weed_server import ( + "context" "fmt" "time" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/needle" ) -func (vs *VolumeServer) VolumeTail(req *volume_server_pb.VolumeTailRequest, stream volume_server_pb.VolumeServer_VolumeTailServer) error { +func (vs *VolumeServer) VolumeTailSender(req *volume_server_pb.VolumeTailSenderRequest, stream volume_server_pb.VolumeServer_VolumeTailSenderServer) error { v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v == nil { @@ -50,7 +52,7 @@ func (vs *VolumeServer) VolumeTail(req *volume_server_pb.VolumeTailRequest, stre } -func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) { +func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) { foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(lastTimestampNs) if err != nil { @@ -61,7 +63,7 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailServer, v * if isLastOne { // need to heart beat to the client to ensure the connection health - sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{IsLastChunk: true}) + sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{IsLastChunk: true}) return lastTimestampNs, sendErr } @@ -78,7 +80,7 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailServer, v * stopOffset = len(needleBody) } - sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{ + sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{ NeedleHeader: needleHeader, NeedleBody: needleBody[i:stopOffset], IsLastChunk: isLastChunk, @@ -96,3 +98,21 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailServer, v * return } + +func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_server_pb.VolumeTailReceiverRequest) (*volume_server_pb.VolumeTailReceiverResponse, error) { + + resp := &volume_server_pb.VolumeTailReceiverResponse{} + + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) + if v == nil { + return resp, fmt.Errorf("receiver not found volume id %d", req.VolumeId) + } + + defer glog.V(1).Infof("receive tailing volume %d finished", v.Id) + + return resp, operation.TailVolumeFromServer(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.DrainingSeconds), func(n *needle.Needle) error { + _, err := vs.store.Write(v.Id, n) + return err + }) + +} |
