aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/pub_client
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-28 12:30:08 -0800
committerchrislu <chris.lu@gmail.com>2024-01-28 12:30:08 -0800
commitdedfd31dfb4e467f647017f3a6778b6492eca879 (patch)
tree612dca5b1c7e934a7fedffd6bbbe37a331fee5fe /weed/mq/client/pub_client
parentcbf750a31ff02a120450059c8724c49b1a0c05e7 (diff)
downloadseaweedfs-dedfd31dfb4e467f647017f3a6778b6492eca879.tar.xz
seaweedfs-dedfd31dfb4e467f647017f3a6778b6492eca879.zip
refactor
Diffstat (limited to 'weed/mq/client/pub_client')
-rw-r--r--weed/mq/client/pub_client/publisher.go17
-rw-r--r--weed/mq/client/pub_client/scheduler.go18
2 files changed, 25 insertions, 10 deletions
diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go
index 1ffbeea46..68082a70f 100644
--- a/weed/mq/client/pub_client/publisher.go
+++ b/weed/mq/client/pub_client/publisher.go
@@ -8,6 +8,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
+ "log"
"sync"
"time"
)
@@ -16,6 +17,7 @@ type PublisherConfiguration struct {
Topic topic.Topic
CreateTopic bool
CreateTopicPartitionCount int32
+ Brokers []string
}
type PublishClient struct {
@@ -32,13 +34,26 @@ type TopicPublisher struct {
}
func NewTopicPublisher(config *PublisherConfiguration) *TopicPublisher {
- return &TopicPublisher{
+ tp := &TopicPublisher{
partition2Buffer: interval.NewSearchTree[*buffered_queue.BufferedQueue[*mq_pb.DataMessage]](func(a, b int32) int {
return int(a - b)
}),
grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
config: config,
}
+
+ wg := sync.WaitGroup{}
+ wg.Add(1)
+ go func() {
+ if err := tp.StartSchedulerThread(&wg); err != nil {
+ log.Println(err)
+ return
+ }
+ }()
+
+ wg.Wait()
+
+ return tp
}
func (p *TopicPublisher) Shutdown() error {
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go
index 2b9f186e1..12cbe303d 100644
--- a/weed/mq/client/pub_client/scheduler.go
+++ b/weed/mq/client/pub_client/scheduler.go
@@ -28,9 +28,9 @@ type EachPartitionPublishJob struct {
generation int
inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage]
}
-func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string, wg *sync.WaitGroup) error {
+func (p *TopicPublisher) StartSchedulerThread(wg *sync.WaitGroup) error {
- if err := p.doEnsureConfigureTopic(bootstrapBrokers); err != nil {
+ if err := p.doEnsureConfigureTopic(); err != nil {
return fmt.Errorf("configure topic %s: %v", p.config.Topic, err)
}
@@ -40,7 +40,7 @@ func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string, wg *syn
var errChan chan EachPartitionError
for {
glog.V(0).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic)
- if assignments, err := p.doLookupTopicPartitions(bootstrapBrokers); err == nil {
+ if assignments, err := p.doLookupTopicPartitions(); err == nil {
generation++
glog.V(0).Infof("start generation %d with %d assignments", generation, len(assignments))
if errChan == nil {
@@ -183,12 +183,12 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
return nil
}
-func (p *TopicPublisher) doEnsureConfigureTopic(bootstrapBrokers []string) (err error) {
- if len(bootstrapBrokers) == 0 {
+func (p *TopicPublisher) doEnsureConfigureTopic() (err error) {
+ if len(p.config.Brokers) == 0 {
return fmt.Errorf("no bootstrap brokers")
}
var lastErr error
- for _, brokerAddress := range bootstrapBrokers {
+ for _, brokerAddress := range p.config.Brokers {
err = pb.WithBrokerGrpcClient(false,
brokerAddress,
p.grpcDialOption,
@@ -212,12 +212,12 @@ func (p *TopicPublisher) doEnsureConfigureTopic(bootstrapBrokers []string) (err
return nil
}
-func (p *TopicPublisher) doLookupTopicPartitions(bootstrapBrokers []string) (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
- if len(bootstrapBrokers) == 0 {
+func (p *TopicPublisher) doLookupTopicPartitions() (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
+ if len(p.config.Brokers) == 0 {
return nil, fmt.Errorf("no bootstrap brokers")
}
var lastErr error
- for _, brokerAddress := range bootstrapBrokers {
+ for _, brokerAddress := range p.config.Brokers {
err := pb.WithBrokerGrpcClient(false,
brokerAddress,
p.grpcDialOption,