aboutsummaryrefslogtreecommitdiff
path: root/weed/command/msg_broker.go
diff options
context:
space:
mode:
authoryulai.li <blacktear23@gmail.com>2022-06-26 22:43:37 +0800
committeryulai.li <blacktear23@gmail.com>2022-06-26 22:43:37 +0800
commit46e0b629e529f3aff535f90dd25eb719adf1c0d0 (patch)
tree734125b48b6d96f8796a2b89b924312cd169ef0e /weed/command/msg_broker.go
parenta5bd0b3a1644a77dcc0b9ff41c4ce8eb3ea0d566 (diff)
parentdc59ccd110a321db7d0b0480631aa95a3d9ba7e6 (diff)
downloadseaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.tar.xz
seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.zip
Update tikv client version and add one PC support
Diffstat (limited to 'weed/command/msg_broker.go')
-rw-r--r--weed/command/msg_broker.go19
1 files changed, 7 insertions, 12 deletions
diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go
index db0b4148d..3274f599b 100644
--- a/weed/command/msg_broker.go
+++ b/weed/command/msg_broker.go
@@ -3,7 +3,6 @@ package command
import (
"context"
"fmt"
- "strconv"
"time"
"google.golang.org/grpc/reflection"
@@ -63,35 +62,31 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool {
grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile)
- filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*msgBrokerOpt.filer)
- if err != nil {
- glog.Fatal(err)
- return false
- }
+ filerAddress := pb.ServerAddress(*msgBrokerOpt.filer)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker")
cipher := false
for {
- err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
- return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
+ return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)
}
cipher = resp.Cipher
return nil
})
if err != nil {
- glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress)
+ glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerAddress.ToGrpcAddress())
time.Sleep(time.Second)
} else {
- glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress)
+ glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerAddress.ToGrpcAddress())
break
}
}
qs, err := broker.NewMessageBroker(&broker.MessageBrokerOption{
- Filers: []string{*msgBrokerOpt.filer},
+ Filers: []pb.ServerAddress{filerAddress},
DefaultReplication: "",
MaxMB: 0,
Ip: *msgBrokerOpt.ip,
@@ -100,7 +95,7 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool {
}, grpcDialOption)
// start grpc listener
- grpcL, err := util.NewListener(":"+strconv.Itoa(*msgBrokerOpt.port), 0)
+ grpcL, _, err := util.NewIpAndLocalListeners("", *msgBrokerOpt.port, 0)
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err)
}