aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_server.go')
-rw-r--r--weed/mq/broker/broker_server.go64
1 files changed, 0 insertions, 64 deletions
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 36f216a48..63e248797 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -1,15 +1,11 @@
package broker
import (
- "context"
"github.com/chrislusf/seaweedfs/weed/cluster"
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
"github.com/chrislusf/seaweedfs/weed/wdclient"
- "time"
-
"google.golang.org/grpc"
- "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -33,7 +29,6 @@ type MessageQueueBroker struct {
option *MessageQueueBrokerOption
grpcDialOption grpc.DialOption
MasterClient *wdclient.MasterClient
- topicManager *TopicManager
}
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
@@ -44,72 +39,13 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, option.Masters),
}
- mqBroker.topicManager = NewTopicManager(mqBroker)
-
mqBroker.checkFilers()
- go mqBroker.keepConnectedToOneFiler()
go mqBroker.MasterClient.KeepConnectedToMaster()
return mqBroker, nil
}
-func (broker *MessageQueueBroker) keepConnectedToOneFiler() {
-
- for {
- for _, filer := range broker.option.Filers {
- broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- stream, err := client.KeepConnected(ctx)
- if err != nil {
- glog.V(0).Infof("%s:%d failed to keep connected to %s: %v", broker.option.Ip, broker.option.Port, filer, err)
- return err
- }
-
- initRequest := &filer_pb.KeepConnectedRequest{
- Name: broker.option.Ip,
- GrpcPort: uint32(broker.option.Port),
- }
- for _, tp := range broker.topicManager.ListTopicPartitions() {
- initRequest.Resources = append(initRequest.Resources, tp.String())
- }
- if err := stream.Send(&filer_pb.KeepConnectedRequest{
- Name: broker.option.Ip,
- GrpcPort: uint32(broker.option.Port),
- }); err != nil {
- glog.V(0).Infof("broker %s:%d failed to init at %s: %v", broker.option.Ip, broker.option.Port, filer, err)
- return err
- }
-
- // TODO send events of adding/removing topics
-
- glog.V(0).Infof("conntected with filer: %v", filer)
- for {
- if err := stream.Send(&filer_pb.KeepConnectedRequest{
- Name: broker.option.Ip,
- GrpcPort: uint32(broker.option.Port),
- }); err != nil {
- glog.V(0).Infof("%s:%d failed to sendto %s: %v", broker.option.Ip, broker.option.Port, filer, err)
- return err
- }
- // println("send heartbeat")
- if _, err := stream.Recv(); err != nil {
- glog.V(0).Infof("%s:%d failed to receive from %s: %v", broker.option.Ip, broker.option.Port, filer, err)
- return err
- }
- // println("received reply")
- time.Sleep(11 * time.Second)
- // println("woke up")
- }
- return nil
- })
- time.Sleep(3 * time.Second)
- }
- }
-
-}
-
func (broker *MessageQueueBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error {
return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn)