diff options
Diffstat (limited to 'weed/mq/client/pub_client/publisher.go')
| -rw-r--r-- | weed/mq/client/pub_client/publisher.go | 26 |
1 files changed, 19 insertions, 7 deletions
diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go index bf1711e38..a0c26db36 100644 --- a/weed/mq/client/pub_client/publisher.go +++ b/weed/mq/client/pub_client/publisher.go @@ -1,8 +1,9 @@ package pub_client import ( + "fmt" "github.com/rdleal/intervalst/interval" - "github.com/seaweedfs/seaweedfs/weed/mq/balancer" + "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -11,6 +12,8 @@ import ( ) type PublisherConfiguration struct { + CreateTopic bool + CreateTopicPartitionCount int32 } type PublishClient struct { @@ -24,9 +27,10 @@ type TopicPublisher struct { partition2Broker *interval.SearchTree[*PublishClient, int32] grpcDialOption grpc.DialOption sync.Mutex // protects grpc + config *PublisherConfiguration } -func NewTopicPublisher(namespace, topic string) *TopicPublisher { +func NewTopicPublisher(namespace, topic string, config *PublisherConfiguration) *TopicPublisher { return &TopicPublisher{ namespace: namespace, topic: topic, @@ -34,19 +38,27 @@ func NewTopicPublisher(namespace, topic string) *TopicPublisher { return int(a - b) }), grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), + config: config, } } -func (p *TopicPublisher) Connect(bootstrapBroker string) error { - if err := p.doLookup(bootstrapBroker); err != nil { - return err +func (p *TopicPublisher) Connect(bootstrapBrokers []string) (err error) { + if len(bootstrapBrokers) == 0 { + return nil } - return nil + for _, b := range bootstrapBrokers { + err = p.doLookupAndConnect(b) + if err == nil { + return nil + } + fmt.Printf("failed to connect to %s: %v\n\n", b, err) + } + return err } func (p *TopicPublisher) Shutdown() error { - if clients, found := p.partition2Broker.AllIntersections(0, balancer.MaxPartitionCount); found { + if clients, found := p.partition2Broker.AllIntersections(0, pub_balancer.MaxPartitionCount); found { for _, client := range clients { client.CloseSend() } |
