aboutsummaryrefslogtreecommitdiff
path: root/weed/server/volume_grpc_tail.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/volume_grpc_tail.go')
-rw-r--r--weed/server/volume_grpc_tail.go28
1 files changed, 24 insertions, 4 deletions
diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go
index 87db6e146..16154c9cc 100644
--- a/weed/server/volume_grpc_tail.go
+++ b/weed/server/volume_grpc_tail.go
@@ -1,16 +1,18 @@
package weed_server
import (
+ "context"
"fmt"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
)
-func (vs *VolumeServer) VolumeTail(req *volume_server_pb.VolumeTailRequest, stream volume_server_pb.VolumeServer_VolumeTailServer) error {
+func (vs *VolumeServer) VolumeTailSender(req *volume_server_pb.VolumeTailSenderRequest, stream volume_server_pb.VolumeServer_VolumeTailSenderServer) error {
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v == nil {
@@ -50,7 +52,7 @@ func (vs *VolumeServer) VolumeTail(req *volume_server_pb.VolumeTailRequest, stre
}
-func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) {
+func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) {
foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(lastTimestampNs)
if err != nil {
@@ -61,7 +63,7 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailServer, v *
if isLastOne {
// need to heart beat to the client to ensure the connection health
- sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{IsLastChunk: true})
+ sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{IsLastChunk: true})
return lastTimestampNs, sendErr
}
@@ -78,7 +80,7 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailServer, v *
stopOffset = len(needleBody)
}
- sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{
+ sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{
NeedleHeader: needleHeader,
NeedleBody: needleBody[i:stopOffset],
IsLastChunk: isLastChunk,
@@ -96,3 +98,21 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailServer, v *
return
}
+
+func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_server_pb.VolumeTailReceiverRequest) (*volume_server_pb.VolumeTailReceiverResponse, error) {
+
+ resp := &volume_server_pb.VolumeTailReceiverResponse{}
+
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return resp, fmt.Errorf("receiver not found volume id %d", req.VolumeId)
+ }
+
+ defer glog.V(1).Infof("receive tailing volume %d finished", v.Id)
+
+ return resp, operation.TailVolumeFromServer(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.DrainingSeconds), func(n *needle.Needle) error {
+ _, err := vs.store.Write(v.Id, n)
+ return err
+ })
+
+}