aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-04-18 19:22:13 -0700
committerChris Lu <chris.lu@gmail.com>2019-04-18 19:22:13 -0700
commitfa176fe80f77e569d03b2975a5c15cc213f37ea4 (patch)
treeb3dea937371004d3a813f4b1f680a40a632b7230 /weed/server
parent3dce1016cb36a511e3ae0fa67c5eb5631f1ce4d6 (diff)
downloadseaweedfs-fa176fe80f77e569d03b2975a5c15cc213f37ea4.tar.xz
seaweedfs-fa176fe80f77e569d03b2975a5c15cc213f37ea4.zip
volume tailing chunks large files
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/volume_grpc_tail.go (renamed from weed/server/volume_grpc_stream_follow.go)28
1 files changed, 21 insertions, 7 deletions
diff --git a/weed/server/volume_grpc_stream_follow.go b/weed/server/volume_grpc_tail.go
index 52a9c1473..da248498f 100644
--- a/weed/server/volume_grpc_stream_follow.go
+++ b/weed/server/volume_grpc_tail.go
@@ -30,6 +30,7 @@ func (vs *VolumeServer) VolumeTail(req *volume_server_pb.VolumeTailRequest, stre
time.Sleep(2 * time.Second)
if req.DrainingSeconds == 0 {
+ lastTimestampNs = lastProcessedTimestampNs
continue
}
if lastProcessedTimestampNs == lastTimestampNs {
@@ -59,18 +60,31 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailServer, v *
if isLastOne {
// need to heart beat to the client to ensure the connection health
- sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{})
+ sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{IsLastChunk: true})
return lastTimestampNs, sendErr
}
err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error {
- sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{
- NeedleHeader: needleHeader,
- NeedleBody: needleBody,
- })
- if sendErr != nil {
- return sendErr
+ blockSizeLimit := 1024 * 1024 * 2
+ isLastChunk := false
+
+ // need to send body by chunks
+ for i := 0; i < len(needleBody); i += blockSizeLimit {
+ stopOffset := i + blockSizeLimit
+ if stopOffset >= len(needleBody) {
+ isLastChunk = true
+ stopOffset = len(needleBody)
+ }
+
+ sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{
+ NeedleHeader: needleHeader,
+ NeedleBody: needleBody[i:stopOffset],
+ IsLastChunk: isLastChunk,
+ })
+ if sendErr != nil {
+ return sendErr
+ }
}
lastProcessedTimestampNs = needleAppendAtNs