aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-28 12:06:58 -0800
committerchrislu <chris.lu@gmail.com>2024-01-28 12:06:58 -0800
commitb51dfe2bffba5f8c3e699a9594e935cdefc8ede5 (patch)
tree00d798e401d90e720b60d4c8be81e72fe5947b57
parentf8c55f101e2e3f85bc84da703de37a9c22a48bc3 (diff)
downloadseaweedfs-b51dfe2bffba5f8c3e699a9594e935cdefc8ede5.tar.xz
seaweedfs-b51dfe2bffba5f8c3e699a9594e935cdefc8ede5.zip
wait for publishing clients
-rw-r--r--weed/mq/client/cmd/weed_pub/publisher.go7
-rw-r--r--weed/mq/client/pub_client/scheduler.go14
2 files changed, 16 insertions, 5 deletions
diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go
index 3ac037973..419f68f42 100644
--- a/weed/mq/client/cmd/weed_pub/publisher.go
+++ b/weed/mq/client/cmd/weed_pub/publisher.go
@@ -43,18 +43,21 @@ func main() {
CreateTopicPartitionCount: int32(*partitionCount),
}
publisher := pub_client.NewTopicPublisher(*namespace, *topic, config)
+ wg := sync.WaitGroup{}
+ wg.Add(1)
go func() {
brokers := strings.Split(*seedBrokers, ",")
- if err := publisher.StartSchedulerThread(brokers); err != nil {
+ if err := publisher.StartSchedulerThread(brokers, &wg); err != nil {
fmt.Println(err)
return
}
}()
+ wg.Wait()
+
startTime := time.Now()
// Start multiple publishers
- var wg sync.WaitGroup
for i := 0; i < *concurrency; i++ {
wg.Add(1)
go func(id int) {
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go
index e617af09f..9d02d5f7b 100644
--- a/weed/mq/client/pub_client/scheduler.go
+++ b/weed/mq/client/pub_client/scheduler.go
@@ -28,7 +28,7 @@ type EachPartitionPublishJob struct {
generation int
inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage]
}
-func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string) error {
+func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string, wg *sync.WaitGroup) error {
if err := p.doEnsureConfigureTopic(bootstrapBrokers); err != nil {
return fmt.Errorf("configure topic %s/%s: %v", p.namespace, p.topic, err)
@@ -39,10 +39,10 @@ func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string) error {
generation := 0
var errChan chan EachPartitionError
for {
- glog.V(0).Infof("lookup partitions gen %d topic %s/%s", generation, p.namespace, p.topic)
+ glog.V(0).Infof("lookup partitions gen %d topic %s/%s", generation+1, p.namespace, p.topic)
if assignments, err := p.doLookupTopicPartitions(bootstrapBrokers); err == nil {
generation++
- glog.V(0).Infof("start generation %d", generation)
+ glog.V(0).Infof("start generation %d with %d assignments", generation, len(assignments))
if errChan == nil {
errChan = make(chan EachPartitionError, len(assignments))
}
@@ -53,6 +53,10 @@ func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string) error {
continue
}
+ if generation == 1 {
+ wg.Done()
+ }
+
// wait for any error to happen. If so, consume all remaining errors, and retry
for {
select {
@@ -237,6 +241,10 @@ func (p *TopicPublisher) doLookupTopicPartitions(bootstrapBrokers []string) (ass
return err
}
+ if len(lookupResp.BrokerPartitionAssignments) == 0 {
+ return fmt.Errorf("no broker partition assignments")
+ }
+
assignments = lookupResp.BrokerPartitionAssignments
return nil