diff options
Diffstat (limited to 'weed/messaging/broker/broker_server.go')
| -rw-r--r-- | weed/messaging/broker/broker_server.go | 76 |
1 files changed, 24 insertions, 52 deletions
diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go index 29c227274..9cad27214 100644 --- a/weed/messaging/broker/broker_server.go +++ b/weed/messaging/broker/broker_server.go @@ -16,6 +16,7 @@ type MessageBrokerOption struct { Filers []string DefaultReplication string MaxMB int + Ip string Port int Cipher bool } @@ -37,73 +38,44 @@ func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOptio messageBroker.checkPeers() - // go messageBroker.loopForEver() + go messageBroker.keepConnectedToOneFiler() return messageBroker, nil } -func (broker *MessageBroker) loopForEver() { +func (broker *MessageBroker) keepConnectedToOneFiler() { for { - broker.checkPeers() - time.Sleep(3 * time.Second) - } - -} - -func (broker *MessageBroker) checkPeers() { - - // contact a filer about masters - var masters []string - found := false - for !found { for _, filer := range broker.option.Filers { - err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { + stream, err := client.KeepConnected(context.Background()) if err != nil { + glog.V(0).Infof("%s:%d failed to keep connected to %s: %v", broker.option.Ip, broker.option.Port, filer, err) return err } - masters = append(masters, resp.Masters...) - return nil - }) - if err == nil { - found = true - break - } - glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err) - time.Sleep(time.Second) - } - } - glog.V(0).Infof("received master list: %s", masters) - - // contact each masters for filers - var filers []string - found = false - for !found { - for _, master := range masters { - err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error { - resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{ - ClientType: "filer", - }) - if err != nil { - return err + glog.V(0).Infof("conntected with filer: %v", filer) + for { + if err := stream.Send(&filer_pb.KeepConnectedRequest{ + Name: broker.option.Ip, + GrpcPort: uint32(broker.option.Port), + }); err != nil { + glog.V(0).Infof("%s:%d failed to sendto %s: %v", broker.option.Ip, broker.option.Port, filer, err) + return err + } + // println("send heartbeat") + if _, err := stream.Recv(); err != nil { + glog.V(0).Infof("%s:%d failed to receive from %s: %v", broker.option.Ip, broker.option.Port, filer, err) + return err + } + // println("received reply") + time.Sleep(11*time.Second) + // println("woke up") } - - filers = append(filers, resp.GrpcAddresses...) - return nil }) - if err == nil { - found = true - break - } - glog.V(0).Infof("failed to list filers: %v", err) - time.Sleep(time.Second) + time.Sleep(3*time.Second) } } - glog.V(0).Infof("received filer list: %s", filers) - - broker.option.Filers = filers } |
