aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/broker/broker_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/messaging/broker/broker_server.go')
-rw-r--r--weed/messaging/broker/broker_server.go76
1 files changed, 24 insertions, 52 deletions
diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go
index 29c227274..9cad27214 100644
--- a/weed/messaging/broker/broker_server.go
+++ b/weed/messaging/broker/broker_server.go
@@ -16,6 +16,7 @@ type MessageBrokerOption struct {
Filers []string
DefaultReplication string
MaxMB int
+ Ip string
Port int
Cipher bool
}
@@ -37,73 +38,44 @@ func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOptio
messageBroker.checkPeers()
- // go messageBroker.loopForEver()
+ go messageBroker.keepConnectedToOneFiler()
return messageBroker, nil
}
-func (broker *MessageBroker) loopForEver() {
+func (broker *MessageBroker) keepConnectedToOneFiler() {
for {
- broker.checkPeers()
- time.Sleep(3 * time.Second)
- }
-
-}
-
-func (broker *MessageBroker) checkPeers() {
-
- // contact a filer about masters
- var masters []string
- found := false
- for !found {
for _, filer := range broker.option.Filers {
- err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
- resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ stream, err := client.KeepConnected(context.Background())
if err != nil {
+ glog.V(0).Infof("%s:%d failed to keep connected to %s: %v", broker.option.Ip, broker.option.Port, filer, err)
return err
}
- masters = append(masters, resp.Masters...)
- return nil
- })
- if err == nil {
- found = true
- break
- }
- glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err)
- time.Sleep(time.Second)
- }
- }
- glog.V(0).Infof("received master list: %s", masters)
-
- // contact each masters for filers
- var filers []string
- found = false
- for !found {
- for _, master := range masters {
- err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error {
- resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{
- ClientType: "filer",
- })
- if err != nil {
- return err
+ glog.V(0).Infof("conntected with filer: %v", filer)
+ for {
+ if err := stream.Send(&filer_pb.KeepConnectedRequest{
+ Name: broker.option.Ip,
+ GrpcPort: uint32(broker.option.Port),
+ }); err != nil {
+ glog.V(0).Infof("%s:%d failed to sendto %s: %v", broker.option.Ip, broker.option.Port, filer, err)
+ return err
+ }
+ // println("send heartbeat")
+ if _, err := stream.Recv(); err != nil {
+ glog.V(0).Infof("%s:%d failed to receive from %s: %v", broker.option.Ip, broker.option.Port, filer, err)
+ return err
+ }
+ // println("received reply")
+ time.Sleep(11*time.Second)
+ // println("woke up")
}
-
- filers = append(filers, resp.GrpcAddresses...)
-
return nil
})
- if err == nil {
- found = true
- break
- }
- glog.V(0).Infof("failed to list filers: %v", err)
- time.Sleep(time.Second)
+ time.Sleep(3*time.Second)
}
}
- glog.V(0).Infof("received filer list: %s", filers)
-
- broker.option.Filers = filers
}