aboutsummaryrefslogtreecommitdiff
path: root/weed/operation/tail_volume.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/operation/tail_volume.go')
-rw-r--r--weed/operation/tail_volume.go11
1 files changed, 7 insertions, 4 deletions
diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go
index b53f18ce1..045948274 100644
--- a/weed/operation/tail_volume.go
+++ b/weed/operation/tail_volume.go
@@ -5,14 +5,15 @@ import (
"fmt"
"io"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
- "google.golang.org/grpc"
)
-func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error {
+func TailVolume(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error {
// find volume location, replication, ttl info
- lookup, err := Lookup(master, vid.String())
+ lookup, err := Lookup(masterFn, vid.String())
if err != nil {
return fmt.Errorf("look up volume %d: %v", vid, err)
}
@@ -27,8 +28,10 @@ func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.Volume
func TailVolumeFromSource(volumeServer string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error {
return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
- stream, err := client.VolumeTailSender(context.Background(), &volume_server_pb.VolumeTailSenderRequest{
+ stream, err := client.VolumeTailSender(ctx, &volume_server_pb.VolumeTailSenderRequest{
VolumeId: uint32(vid),
SinceNs: sinceNs,
IdleTimeoutSeconds: uint32(idleTimeoutSeconds),