aboutsummaryrefslogtreecommitdiff
path: root/weed/operation
diff options
context:
space:
mode:
Diffstat (limited to 'weed/operation')
-rw-r--r--weed/operation/assign_file_id.go4
-rw-r--r--weed/operation/delete_content.go2
-rw-r--r--weed/operation/grpc_client.go8
-rw-r--r--weed/operation/lookup.go2
-rw-r--r--weed/operation/sync_volume.go2
-rw-r--r--weed/operation/tail_volume.go2
6 files changed, 10 insertions, 10 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go
index f0f7581f3..b716300e2 100644
--- a/weed/operation/assign_file_id.go
+++ b/weed/operation/assign_file_id.go
@@ -48,7 +48,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest
continue
}
- lastError = WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ lastError = WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.AssignRequest{
Count: request.Count,
@@ -105,7 +105,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest
func LookupJwt(master pb.ServerAddress, grpcDialOption grpc.DialOption, fileId string) (token security.EncodedJwt) {
- WithMasterServerClient(master, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ WithMasterServerClient(false, master, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
resp, grpcErr := masterClient.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
VolumeOrFileIds: []string{fileId},
diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go
index d762f51e1..996c0b29e 100644
--- a/weed/operation/delete_content.go
+++ b/weed/operation/delete_content.go
@@ -123,7 +123,7 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str
// DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc
func DeleteFilesAtOneVolumeServer(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) (ret []*volume_server_pb.DeleteResult, err error) {
- err = WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = WithVolumeServerClient(false, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
req := &volume_server_pb.BatchDeleteRequest{
FileIds: fileIds,
diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go
index 743682203..9b68d2286 100644
--- a/weed/operation/grpc_client.go
+++ b/weed/operation/grpc_client.go
@@ -8,18 +8,18 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
)
-func WithVolumeServerClient(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error {
+func WithVolumeServerClient(streamingMode bool, volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error {
- return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := volume_server_pb.NewVolumeServerClient(grpcConnection)
return fn(client)
}, volumeServer.ToGrpcAddress(), grpcDialOption)
}
-func WithMasterServerClient(masterServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error {
+func WithMasterServerClient(streamingMode bool, masterServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error {
- return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := master_pb.NewSeaweedClient(grpcConnection)
return fn(client)
}, masterServer.ToGrpcAddress(), grpcDialOption)
diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go
index c222dff92..1eb5dd320 100644
--- a/weed/operation/lookup.go
+++ b/weed/operation/lookup.go
@@ -79,7 +79,7 @@ func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids
//only query unknown_vids
- err := WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ err := WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.LookupVolumeRequest{
VolumeOrFileIds: unknown_vids,
diff --git a/weed/operation/sync_volume.go b/weed/operation/sync_volume.go
index fdd22ac85..de71a198d 100644
--- a/weed/operation/sync_volume.go
+++ b/weed/operation/sync_volume.go
@@ -9,7 +9,7 @@ import (
func GetVolumeSyncStatus(server pb.ServerAddress, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) {
- WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ WithVolumeServerClient(false, server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
resp, err = client.VolumeSyncStatus(context.Background(), &volume_server_pb.VolumeSyncStatusRequest{
VolumeId: vid,
diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go
index bedeeb3b5..d3449873b 100644
--- a/weed/operation/tail_volume.go
+++ b/weed/operation/tail_volume.go
@@ -28,7 +28,7 @@ func TailVolume(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid needle
}
func TailVolumeFromSource(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error {
- return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ return WithVolumeServerClient(true, volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()