aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-10-13 19:43:09 -0700
committerchrislu <chris.lu@gmail.com>2025-10-13 19:43:09 -0700
commitfba4fc3a7dc54576b735110739093a37f184b0f9 (patch)
treea8470effdce17ccc73b40939ba5bb9cb7310f0ff
parente00c6ca9499277891faa78aaba7ec85eeecf4ed7 (diff)
downloadseaweedfs-fba4fc3a7dc54576b735110739093a37f184b0f9.tar.xz
seaweedfs-fba4fc3a7dc54576b735110739093a37f184b0f9.zip
All consumers share the same group for load balancing across partitions
-rw-r--r--test/kafka/kafka-client-loadtest/internal/consumer/consumer.go7
1 files changed, 4 insertions, 3 deletions
diff --git a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go
index e1c4caa41..1171bd5c0 100644
--- a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go
+++ b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go
@@ -39,7 +39,8 @@ type Consumer struct {
// New creates a new consumer instance
func New(cfg *config.Config, collector *metrics.Collector, id int) (*Consumer, error) {
- consumerGroup := fmt.Sprintf("%s-%d", cfg.Consumers.GroupPrefix, id)
+ // All consumers share the same group for load balancing across partitions
+ consumerGroup := cfg.Consumers.GroupPrefix
c := &Consumer{
id: id,
@@ -226,9 +227,9 @@ func (c *Consumer) runSaramaConsumer(ctx context.Context) {
log.Printf("Consumer %d: Error consuming: %v", c.id, err)
c.metricsCollector.RecordConsumerError()
- // Wait before retrying
+ // Wait briefly before retrying (reduced from 5s to 1s for faster recovery)
select {
- case <-time.After(5 * time.Second):
+ case <-time.After(1 * time.Second):
case <-ctx.Done():
return
}