From 9f9ef1340c6441c10c15e2642b5074d34fe40332 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 26 Dec 2021 00:15:03 -0800 Subject: use streaming mode for long poll grpc calls streaming mode would create separate grpc connections for each call. this is to ensure the long poll connections are properly closed. --- unmaintained/diff_volume_servers/diff_volume_servers.go | 4 ++-- .../load_test/load_test_meta_tail/load_test_meta_tail.go | 2 +- unmaintained/stream_read_volume/stream_read_volume.go | 11 +++++------ 3 files changed, 8 insertions(+), 9 deletions(-) (limited to 'unmaintained') diff --git a/unmaintained/diff_volume_servers/diff_volume_servers.go b/unmaintained/diff_volume_servers/diff_volume_servers.go index e8361a6cf..0188d18d4 100644 --- a/unmaintained/diff_volume_servers/diff_volume_servers.go +++ b/unmaintained/diff_volume_servers/diff_volume_servers.go @@ -122,7 +122,7 @@ type needleState struct { func getVolumeFiles(v uint32, addr pb.ServerAddress) (map[types.NeedleId]needleState, int64, error) { var idxFile *bytes.Reader - err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() copyFileClient, err := vs.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ @@ -180,7 +180,7 @@ func getVolumeFiles(v uint32, addr pb.ServerAddress) (map[types.NeedleId]needleS func getNeedleFileId(v uint32, nid types.NeedleId, addr pb.ServerAddress) (string, error) { var id string - err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { resp, err := vs.VolumeNeedleStatus(context.Background(), &volume_server_pb.VolumeNeedleStatusRequest{ VolumeId: v, NeedleId: uint64(nid), diff --git a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go index 8ccad1e49..faaea9e8d 100644 --- a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go +++ b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go @@ -51,7 +51,7 @@ func main() { } func startGenerateMetadata() { - pb.WithFilerClient(pb.ServerAddress(*tailFiler), grpc.WithInsecure(), func(client filer_pb.SeaweedFilerClient) error { + pb.WithFilerClient(false, pb.ServerAddress(*tailFiler), grpc.WithInsecure(), func(client filer_pb.SeaweedFilerClient) error { for i := 0; i < *n; i++ { name := fmt.Sprintf("file%d", i) diff --git a/unmaintained/stream_read_volume/stream_read_volume.go b/unmaintained/stream_read_volume/stream_read_volume.go index e120b9920..bbe5abedb 100644 --- a/unmaintained/stream_read_volume/stream_read_volume.go +++ b/unmaintained/stream_read_volume/stream_read_volume.go @@ -15,9 +15,9 @@ import ( ) var ( - volumeServer = flag.String("volumeServer", "localhost:8080", "a volume server") - volumeId = flag.Int("volumeId", -1, "a volume id to stream read") - grpcDialOption grpc.DialOption + volumeServer = flag.String("volumeServer", "localhost:8080", "a volume server") + volumeId = flag.Int("volumeId", -1, "a volume id to stream read") + grpcDialOption grpc.DialOption ) func main() { @@ -33,11 +33,11 @@ func main() { return nil } - err := operation.WithVolumeServerClient(pb.ServerAddress(*volumeServer), grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(true, pb.ServerAddress(*volumeServer), grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() copyFileClient, err := vs.ReadAllNeedles(ctx, &volume_server_pb.ReadAllNeedlesRequest{ - VolumeIds: []uint32{vid}, + VolumeIds: []uint32{vid}, }) if err != nil { return err @@ -61,4 +61,3 @@ func main() { } } - -- cgit v1.2.3