diff options
| author | chrislu <chris.lu@gmail.com> | 2024-01-28 12:06:58 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-01-28 12:06:58 -0800 |
| commit | b51dfe2bffba5f8c3e699a9594e935cdefc8ede5 (patch) | |
| tree | 00d798e401d90e720b60d4c8be81e72fe5947b57 | |
| parent | f8c55f101e2e3f85bc84da703de37a9c22a48bc3 (diff) | |
| download | seaweedfs-b51dfe2bffba5f8c3e699a9594e935cdefc8ede5.tar.xz seaweedfs-b51dfe2bffba5f8c3e699a9594e935cdefc8ede5.zip | |
wait for publishing clients
| -rw-r--r-- | weed/mq/client/cmd/weed_pub/publisher.go | 7 | ||||
| -rw-r--r-- | weed/mq/client/pub_client/scheduler.go | 14 |
2 files changed, 16 insertions, 5 deletions
diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go index 3ac037973..419f68f42 100644 --- a/weed/mq/client/cmd/weed_pub/publisher.go +++ b/weed/mq/client/cmd/weed_pub/publisher.go @@ -43,18 +43,21 @@ func main() { CreateTopicPartitionCount: int32(*partitionCount), } publisher := pub_client.NewTopicPublisher(*namespace, *topic, config) + wg := sync.WaitGroup{} + wg.Add(1) go func() { brokers := strings.Split(*seedBrokers, ",") - if err := publisher.StartSchedulerThread(brokers); err != nil { + if err := publisher.StartSchedulerThread(brokers, &wg); err != nil { fmt.Println(err) return } }() + wg.Wait() + startTime := time.Now() // Start multiple publishers - var wg sync.WaitGroup for i := 0; i < *concurrency; i++ { wg.Add(1) go func(id int) { diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index e617af09f..9d02d5f7b 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -28,7 +28,7 @@ type EachPartitionPublishJob struct { generation int inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage] } -func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string) error { +func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string, wg *sync.WaitGroup) error { if err := p.doEnsureConfigureTopic(bootstrapBrokers); err != nil { return fmt.Errorf("configure topic %s/%s: %v", p.namespace, p.topic, err) @@ -39,10 +39,10 @@ func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string) error { generation := 0 var errChan chan EachPartitionError for { - glog.V(0).Infof("lookup partitions gen %d topic %s/%s", generation, p.namespace, p.topic) + glog.V(0).Infof("lookup partitions gen %d topic %s/%s", generation+1, p.namespace, p.topic) if assignments, err := p.doLookupTopicPartitions(bootstrapBrokers); err == nil { generation++ - glog.V(0).Infof("start generation %d", generation) + glog.V(0).Infof("start generation %d with %d assignments", generation, len(assignments)) if errChan == nil { errChan = make(chan EachPartitionError, len(assignments)) } @@ -53,6 +53,10 @@ func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string) error { continue } + if generation == 1 { + wg.Done() + } + // wait for any error to happen. If so, consume all remaining errors, and retry for { select { @@ -237,6 +241,10 @@ func (p *TopicPublisher) doLookupTopicPartitions(bootstrapBrokers []string) (ass return err } + if len(lookupResp.BrokerPartitionAssignments) == 0 { + return fmt.Errorf("no broker partition assignments") + } + assignments = lookupResp.BrokerPartitionAssignments return nil |
