diff options
| author | yulai.li <blacktear23@gmail.com> | 2022-06-26 22:43:37 +0800 |
|---|---|---|
| committer | yulai.li <blacktear23@gmail.com> | 2022-06-26 22:43:37 +0800 |
| commit | 46e0b629e529f3aff535f90dd25eb719adf1c0d0 (patch) | |
| tree | 734125b48b6d96f8796a2b89b924312cd169ef0e /weed/command/msg_broker.go | |
| parent | a5bd0b3a1644a77dcc0b9ff41c4ce8eb3ea0d566 (diff) | |
| parent | dc59ccd110a321db7d0b0480631aa95a3d9ba7e6 (diff) | |
| download | seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.tar.xz seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.zip | |
Update tikv client version and add one PC support
Diffstat (limited to 'weed/command/msg_broker.go')
| -rw-r--r-- | weed/command/msg_broker.go | 19 |
1 files changed, 7 insertions, 12 deletions
diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go index db0b4148d..3274f599b 100644 --- a/weed/command/msg_broker.go +++ b/weed/command/msg_broker.go @@ -3,7 +3,6 @@ package command import ( "context" "fmt" - "strconv" "time" "google.golang.org/grpc/reflection" @@ -63,35 +62,31 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool { grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile) - filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*msgBrokerOpt.filer) - if err != nil { - glog.Fatal(err) - return false - } + filerAddress := pb.ServerAddress(*msgBrokerOpt.filer) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker") cipher := false for { - err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { - return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) + return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) } cipher = resp.Cipher return nil }) if err != nil { - glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress) + glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerAddress.ToGrpcAddress()) time.Sleep(time.Second) } else { - glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress) + glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerAddress.ToGrpcAddress()) break } } qs, err := broker.NewMessageBroker(&broker.MessageBrokerOption{ - Filers: []string{*msgBrokerOpt.filer}, + Filers: []pb.ServerAddress{filerAddress}, DefaultReplication: "", MaxMB: 0, Ip: *msgBrokerOpt.ip, @@ -100,7 +95,7 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool { }, grpcDialOption) // start grpc listener - grpcL, err := util.NewListener(":"+strconv.Itoa(*msgBrokerOpt.port), 0) + grpcL, _, err := util.NewIpAndLocalListeners("", *msgBrokerOpt.port, 0) if err != nil { glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err) } |
