diff options
| author | Chris Lu <chris.lu@gmail.com> | 2019-04-18 19:22:13 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2019-04-18 19:22:13 -0700 |
| commit | fa176fe80f77e569d03b2975a5c15cc213f37ea4 (patch) | |
| tree | b3dea937371004d3a813f4b1f680a40a632b7230 /weed/server | |
| parent | 3dce1016cb36a511e3ae0fa67c5eb5631f1ce4d6 (diff) | |
| download | seaweedfs-fa176fe80f77e569d03b2975a5c15cc213f37ea4.tar.xz seaweedfs-fa176fe80f77e569d03b2975a5c15cc213f37ea4.zip | |
volume tailing chunks large files
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/volume_grpc_tail.go (renamed from weed/server/volume_grpc_stream_follow.go) | 28 |
1 files changed, 21 insertions, 7 deletions
diff --git a/weed/server/volume_grpc_stream_follow.go b/weed/server/volume_grpc_tail.go index 52a9c1473..da248498f 100644 --- a/weed/server/volume_grpc_stream_follow.go +++ b/weed/server/volume_grpc_tail.go @@ -30,6 +30,7 @@ func (vs *VolumeServer) VolumeTail(req *volume_server_pb.VolumeTailRequest, stre time.Sleep(2 * time.Second) if req.DrainingSeconds == 0 { + lastTimestampNs = lastProcessedTimestampNs continue } if lastProcessedTimestampNs == lastTimestampNs { @@ -59,18 +60,31 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailServer, v * if isLastOne { // need to heart beat to the client to ensure the connection health - sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{}) + sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{IsLastChunk: true}) return lastTimestampNs, sendErr } err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error { - sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{ - NeedleHeader: needleHeader, - NeedleBody: needleBody, - }) - if sendErr != nil { - return sendErr + blockSizeLimit := 1024 * 1024 * 2 + isLastChunk := false + + // need to send body by chunks + for i := 0; i < len(needleBody); i += blockSizeLimit { + stopOffset := i + blockSizeLimit + if stopOffset >= len(needleBody) { + isLastChunk = true + stopOffset = len(needleBody) + } + + sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{ + NeedleHeader: needleHeader, + NeedleBody: needleBody[i:stopOffset], + IsLastChunk: isLastChunk, + }) + if sendErr != nil { + return sendErr + } } lastProcessedTimestampNs = needleAppendAtNs |
