aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/kafka-client-loadtest/tools/kafka-go-consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'test/kafka/kafka-client-loadtest/tools/kafka-go-consumer.go')
-rw-r--r--test/kafka/kafka-client-loadtest/tools/kafka-go-consumer.go69
1 files changed, 69 insertions, 0 deletions
diff --git a/test/kafka/kafka-client-loadtest/tools/kafka-go-consumer.go b/test/kafka/kafka-client-loadtest/tools/kafka-go-consumer.go
new file mode 100644
index 000000000..1da40c89f
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/tools/kafka-go-consumer.go
@@ -0,0 +1,69 @@
+package main
+
+import (
+ "context"
+ "log"
+ "os"
+ "time"
+
+ "github.com/segmentio/kafka-go"
+)
+
+func main() {
+ if len(os.Args) < 3 {
+ log.Fatal("Usage: kafka-go-consumer <broker> <topic>")
+ }
+ broker := os.Args[1]
+ topic := os.Args[2]
+
+ log.Printf("Connecting to Kafka broker: %s", broker)
+ log.Printf("Topic: %s", topic)
+
+ // Create a new reader
+ r := kafka.NewReader(kafka.ReaderConfig{
+ Brokers: []string{broker},
+ Topic: topic,
+ GroupID: "kafka-go-test-group",
+ MinBytes: 1,
+ MaxBytes: 10e6, // 10MB
+ MaxWait: 1 * time.Second,
+ })
+ defer r.Close()
+
+ log.Printf("Starting to consume messages...")
+
+ ctx := context.Background()
+ messageCount := 0
+ errorCount := 0
+ startTime := time.Now()
+
+ for {
+ m, err := r.ReadMessage(ctx)
+ if err != nil {
+ errorCount++
+ log.Printf("Error reading message #%d: %v", messageCount+1, err)
+
+ // Stop after 10 consecutive errors or 60 seconds
+ if errorCount > 10 || time.Since(startTime) > 60*time.Second {
+ log.Printf("\nStopping after %d errors in %v", errorCount, time.Since(startTime))
+ break
+ }
+ continue
+ }
+
+ // Reset error count on successful read
+ errorCount = 0
+ messageCount++
+
+ log.Printf("Message #%d: topic=%s partition=%d offset=%d key=%s value=%s",
+ messageCount, m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
+
+ // Stop after 100 messages or 60 seconds
+ if messageCount >= 100 || time.Since(startTime) > 60*time.Second {
+ log.Printf("\nSuccessfully consumed %d messages in %v", messageCount, time.Since(startTime))
+ log.Printf("Success rate: %.1f%% (%d/%d including errors)",
+ float64(messageCount)/float64(messageCount+errorCount)*100, messageCount, messageCount+errorCount)
+ break
+ }
+ }
+}