diff options
| author | chrislu <chris.lu@gmail.com> | 2021-12-26 00:15:03 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2021-12-26 00:15:03 -0800 |
| commit | 9f9ef1340c6441c10c15e2642b5074d34fe40332 (patch) | |
| tree | 1e897171c804e63ba6edef4778ea8b243f2ad8d6 /weed/operation | |
| parent | c935b9669e6b18a07c28939b1bd839552e7d2cf5 (diff) | |
| download | seaweedfs-9f9ef1340c6441c10c15e2642b5074d34fe40332.tar.xz seaweedfs-9f9ef1340c6441c10c15e2642b5074d34fe40332.zip | |
use streaming mode for long poll grpc calls
streaming mode would create separate grpc connections for each call.
this is to ensure the long poll connections are properly closed.
Diffstat (limited to 'weed/operation')
| -rw-r--r-- | weed/operation/assign_file_id.go | 4 | ||||
| -rw-r--r-- | weed/operation/delete_content.go | 2 | ||||
| -rw-r--r-- | weed/operation/grpc_client.go | 8 | ||||
| -rw-r--r-- | weed/operation/lookup.go | 2 | ||||
| -rw-r--r-- | weed/operation/sync_volume.go | 2 | ||||
| -rw-r--r-- | weed/operation/tail_volume.go | 2 |
6 files changed, 10 insertions, 10 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index f0f7581f3..b716300e2 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -48,7 +48,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest continue } - lastError = WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + lastError = WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.AssignRequest{ Count: request.Count, @@ -105,7 +105,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest func LookupJwt(master pb.ServerAddress, grpcDialOption grpc.DialOption, fileId string) (token security.EncodedJwt) { - WithMasterServerClient(master, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + WithMasterServerClient(false, master, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { resp, grpcErr := masterClient.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{ VolumeOrFileIds: []string{fileId}, diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go index d762f51e1..996c0b29e 100644 --- a/weed/operation/delete_content.go +++ b/weed/operation/delete_content.go @@ -123,7 +123,7 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str // DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc func DeleteFilesAtOneVolumeServer(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) (ret []*volume_server_pb.DeleteResult, err error) { - err = WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = WithVolumeServerClient(false, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { req := &volume_server_pb.BatchDeleteRequest{ FileIds: fileIds, diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go index 743682203..9b68d2286 100644 --- a/weed/operation/grpc_client.go +++ b/weed/operation/grpc_client.go @@ -8,18 +8,18 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" ) -func WithVolumeServerClient(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error { +func WithVolumeServerClient(streamingMode bool, volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error { - return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := volume_server_pb.NewVolumeServerClient(grpcConnection) return fn(client) }, volumeServer.ToGrpcAddress(), grpcDialOption) } -func WithMasterServerClient(masterServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error { +func WithMasterServerClient(streamingMode bool, masterServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error { - return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) }, masterServer.ToGrpcAddress(), grpcDialOption) diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go index c222dff92..1eb5dd320 100644 --- a/weed/operation/lookup.go +++ b/weed/operation/lookup.go @@ -79,7 +79,7 @@ func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids //only query unknown_vids - err := WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + err := WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.LookupVolumeRequest{ VolumeOrFileIds: unknown_vids, diff --git a/weed/operation/sync_volume.go b/weed/operation/sync_volume.go index fdd22ac85..de71a198d 100644 --- a/weed/operation/sync_volume.go +++ b/weed/operation/sync_volume.go @@ -9,7 +9,7 @@ import ( func GetVolumeSyncStatus(server pb.ServerAddress, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) { - WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + WithVolumeServerClient(false, server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { resp, err = client.VolumeSyncStatus(context.Background(), &volume_server_pb.VolumeSyncStatusRequest{ VolumeId: vid, diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go index bedeeb3b5..d3449873b 100644 --- a/weed/operation/tail_volume.go +++ b/weed/operation/tail_volume.go @@ -28,7 +28,7 @@ func TailVolume(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid needle } func TailVolumeFromSource(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error { - return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + return WithVolumeServerClient(true, volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() |
