aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/pub_client/publisher.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client/pub_client/publisher.go')
-rw-r--r--weed/mq/client/pub_client/publisher.go26
1 files changed, 19 insertions, 7 deletions
diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go
index bf1711e38..a0c26db36 100644
--- a/weed/mq/client/pub_client/publisher.go
+++ b/weed/mq/client/pub_client/publisher.go
@@ -1,8 +1,9 @@
package pub_client
import (
+ "fmt"
"github.com/rdleal/intervalst/interval"
- "github.com/seaweedfs/seaweedfs/weed/mq/balancer"
+ "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
@@ -11,6 +12,8 @@ import (
)
type PublisherConfiguration struct {
+ CreateTopic bool
+ CreateTopicPartitionCount int32
}
type PublishClient struct {
@@ -24,9 +27,10 @@ type TopicPublisher struct {
partition2Broker *interval.SearchTree[*PublishClient, int32]
grpcDialOption grpc.DialOption
sync.Mutex // protects grpc
+ config *PublisherConfiguration
}
-func NewTopicPublisher(namespace, topic string) *TopicPublisher {
+func NewTopicPublisher(namespace, topic string, config *PublisherConfiguration) *TopicPublisher {
return &TopicPublisher{
namespace: namespace,
topic: topic,
@@ -34,19 +38,27 @@ func NewTopicPublisher(namespace, topic string) *TopicPublisher {
return int(a - b)
}),
grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
+ config: config,
}
}
-func (p *TopicPublisher) Connect(bootstrapBroker string) error {
- if err := p.doLookup(bootstrapBroker); err != nil {
- return err
+func (p *TopicPublisher) Connect(bootstrapBrokers []string) (err error) {
+ if len(bootstrapBrokers) == 0 {
+ return nil
}
- return nil
+ for _, b := range bootstrapBrokers {
+ err = p.doLookupAndConnect(b)
+ if err == nil {
+ return nil
+ }
+ fmt.Printf("failed to connect to %s: %v\n\n", b, err)
+ }
+ return err
}
func (p *TopicPublisher) Shutdown() error {
- if clients, found := p.partition2Broker.AllIntersections(0, balancer.MaxPartitionCount); found {
+ if clients, found := p.partition2Broker.AllIntersections(0, pub_balancer.MaxPartitionCount); found {
for _, client := range clients {
client.CloseSend()
}