aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/broker/broker_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/messaging/broker/broker_server.go')
-rw-r--r--weed/messaging/broker/broker_server.go8
1 files changed, 4 insertions, 4 deletions
diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go
index e264f2e56..fd41dd441 100644
--- a/weed/messaging/broker/broker_server.go
+++ b/weed/messaging/broker/broker_server.go
@@ -50,20 +50,20 @@ func (broker *MessageBroker) keepConnectedToOneFiler() {
broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- stream, err := client.SubscribeVolumeLocationUpdates(ctx)
+ stream, err := client.KeepConnected(ctx)
if err != nil {
glog.V(0).Infof("%s:%d failed to keep connected to %s: %v", broker.option.Ip, broker.option.Port, filer, err)
return err
}
- initRequest := &filer_pb.SubscribeVolumeLocationUpdatesRequest{
+ initRequest := &filer_pb.KeepConnectedRequest{
Name: broker.option.Ip,
GrpcPort: uint32(broker.option.Port),
}
for _, tp := range broker.topicManager.ListTopicPartitions() {
initRequest.Resources = append(initRequest.Resources, tp.String())
}
- if err := stream.Send(&filer_pb.SubscribeVolumeLocationUpdatesRequest{
+ if err := stream.Send(&filer_pb.KeepConnectedRequest{
Name: broker.option.Ip,
GrpcPort: uint32(broker.option.Port),
}); err != nil {
@@ -75,7 +75,7 @@ func (broker *MessageBroker) keepConnectedToOneFiler() {
glog.V(0).Infof("conntected with filer: %v", filer)
for {
- if err := stream.Send(&filer_pb.SubscribeVolumeLocationUpdatesRequest{
+ if err := stream.Send(&filer_pb.KeepConnectedRequest{
Name: broker.option.Ip,
GrpcPort: uint32(broker.option.Port),
}); err != nil {