aboutsummaryrefslogtreecommitdiff
path: root/weed/pb/grpc_client_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/pb/grpc_client_server.go')
-rw-r--r--weed/pb/grpc_client_server.go26
1 files changed, 17 insertions, 9 deletions
diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go
index c7cb82a22..a78ed0ca4 100644
--- a/weed/pb/grpc_client_server.go
+++ b/weed/pb/grpc_client_server.go
@@ -3,9 +3,9 @@ package pb
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
"math/rand"
"net/http"
"strconv"
@@ -16,9 +16,9 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
)
const (
@@ -70,7 +70,7 @@ func GrpcDial(ctx context.Context, address string, opts ...grpc.DialOption) (*gr
// opts = append(opts, grpc.WithTimeout(time.Duration(5*time.Second)))
var options []grpc.DialOption
options = append(options,
- // grpc.WithInsecure(),
+ // grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(Max_Message_Size),
grpc.MaxCallRecvMsgSize(Max_Message_Size),
@@ -216,6 +216,14 @@ func WithVolumeServerClient(streamingMode bool, volumeServer ServerAddress, grpc
}
+func WithBrokerClient(streamingMode bool, broker ServerAddress, grpcDialOption grpc.DialOption, fn func(client mq_pb.SeaweedMessagingClient) error) error {
+ return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
+ client := mq_pb.NewSeaweedMessagingClient(grpcConnection)
+ return fn(client)
+ }, broker.ToGrpcAddress(), grpcDialOption)
+
+}
+
func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses map[string]ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) (err error) {
for _, masterGrpcAddress := range masterGrpcAddresses {
@@ -231,10 +239,10 @@ func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses map[stri
return err
}
-func WithBrokerGrpcClient(streamingMode bool, brokerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client messaging_pb.SeaweedMessagingClient) error) error {
+func WithBrokerGrpcClient(streamingMode bool, brokerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client mq_pb.SeaweedMessagingClient) error) error {
return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
- client := messaging_pb.NewSeaweedMessagingClient(grpcConnection)
+ client := mq_pb.NewSeaweedMessagingClient(grpcConnection)
return fn(client)
}, brokerGrpcAddress, grpcDialOption)