aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'test/kafka/kafka-client-loadtest/internal/consumer/consumer.go')
-rw-r--r--test/kafka/kafka-client-loadtest/internal/consumer/consumer.go179
1 files changed, 164 insertions, 15 deletions
diff --git a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go
index 1171bd5c0..6b23fdfe9 100644
--- a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go
+++ b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go
@@ -6,6 +6,8 @@ import (
"encoding/json"
"fmt"
"log"
+ "os"
+ "strings"
"sync"
"time"
@@ -14,6 +16,7 @@ import (
"github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/config"
"github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/metrics"
pb "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema/pb"
+ "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/tracker"
"google.golang.org/protobuf/proto"
)
@@ -35,10 +38,13 @@ type Consumer struct {
messagesProcessed int64
lastOffset map[string]map[int32]int64
offsetMutex sync.RWMutex
+
+ // Record tracking
+ tracker *tracker.Tracker
}
// New creates a new consumer instance
-func New(cfg *config.Config, collector *metrics.Collector, id int) (*Consumer, error) {
+func New(cfg *config.Config, collector *metrics.Collector, id int, recordTracker *tracker.Tracker) (*Consumer, error) {
// All consumers share the same group for load balancing across partitions
consumerGroup := cfg.Consumers.GroupPrefix
@@ -51,6 +57,7 @@ func New(cfg *config.Config, collector *metrics.Collector, id int) (*Consumer, e
useConfluent: false, // Use Sarama by default
lastOffset: make(map[string]map[int32]int64),
schemaFormats: make(map[string]string),
+ tracker: recordTracker,
}
// Initialize schema formats for each topic (must match producer logic)
@@ -101,6 +108,9 @@ func New(cfg *config.Config, collector *metrics.Collector, id int) (*Consumer, e
func (c *Consumer) initSaramaConsumer() error {
config := sarama.NewConfig()
+ // Enable Sarama debug logging to diagnose connection issues
+ sarama.Logger = log.New(os.Stdout, fmt.Sprintf("[Sarama Consumer %d] ", c.id), log.LstdFlags)
+
// Consumer configuration
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest
@@ -130,9 +140,24 @@ func (c *Consumer) initSaramaConsumer() error {
// This allows Sarama to fetch from multiple partitions in parallel
config.Net.MaxOpenRequests = 20 // Increase from default 5 to allow 20 concurrent requests
+ // Connection retry and timeout configuration
+ config.Net.DialTimeout = 30 * time.Second // Increase from default 30s
+ config.Net.ReadTimeout = 30 * time.Second // Increase from default 30s
+ config.Net.WriteTimeout = 30 * time.Second // Increase from default 30s
+ config.Metadata.Retry.Max = 5 // Retry metadata fetch up to 5 times
+ config.Metadata.Retry.Backoff = 500 * time.Millisecond
+ config.Metadata.Timeout = 30 * time.Second // Increase metadata timeout
+
// Version
config.Version = sarama.V2_8_0_0
+ // CRITICAL: Set unique ClientID to ensure each consumer gets a unique member ID
+ // Without this, all consumers from the same process get the same member ID and only 1 joins!
+ // Sarama uses ClientID as part of the member ID generation
+ // Use consumer ID directly - no timestamp needed since IDs are already unique per process
+ config.ClientID = fmt.Sprintf("loadtest-consumer-%d", c.id)
+ log.Printf("Consumer %d: Setting Sarama ClientID to: %s", c.id, config.ClientID)
+
// Create consumer group
consumerGroup, err := sarama.NewConsumerGroup(c.config.Kafka.BootstrapServers, c.consumerGroup, config)
if err != nil {
@@ -560,28 +585,104 @@ type ConsumerGroupHandler struct {
}
// Setup is run at the beginning of a new session, before ConsumeClaim
-func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
+func (h *ConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
log.Printf("Consumer %d: Consumer group session setup", h.consumer.id)
+
+ // Log the generation ID and member ID for this session
+ log.Printf("Consumer %d: Generation=%d, MemberID=%s",
+ h.consumer.id, session.GenerationID(), session.MemberID())
+
+ // Log all assigned partitions and their starting offsets
+ assignments := session.Claims()
+ totalPartitions := 0
+ for topic, partitions := range assignments {
+ for _, partition := range partitions {
+ totalPartitions++
+ log.Printf("Consumer %d: ASSIGNED %s[%d]",
+ h.consumer.id, topic, partition)
+ }
+ }
+ log.Printf("Consumer %d: Total partitions assigned: %d", h.consumer.id, totalPartitions)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
-func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
- log.Printf("Consumer %d: Consumer group session cleanup", h.consumer.id)
+// CRITICAL: Commit all marked offsets before partition reassignment to minimize duplicates
+func (h *ConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
+ log.Printf("Consumer %d: Consumer group session cleanup - committing final offsets before rebalance", h.consumer.id)
+
+ // Commit all marked offsets before releasing partitions
+ // This ensures that when partitions are reassigned to other consumers,
+ // they start from the last processed offset, minimizing duplicate reads
+ session.Commit()
+
+ log.Printf("Consumer %d: Cleanup complete - offsets committed", h.consumer.id)
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages()
func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
msgCount := 0
+ topic := claim.Topic()
+ partition := claim.Partition()
+ initialOffset := claim.InitialOffset()
+ lastTrackedOffset := int64(-1)
+ gapCount := 0
+ var gaps []string // Track gap ranges for detailed analysis
+
+ // Log the starting offset for this partition
+ log.Printf("Consumer %d: START consuming %s[%d] from offset %d (HWM=%d)",
+ h.consumer.id, topic, partition, initialOffset, claim.HighWaterMarkOffset())
+
+ startTime := time.Now()
+ lastLogTime := time.Now()
+
for {
select {
case message, ok := <-claim.Messages():
if !ok {
+ elapsed := time.Since(startTime)
+ // Log detailed gap analysis
+ gapSummary := "none"
+ if len(gaps) > 0 {
+ gapSummary = fmt.Sprintf("[%s]", strings.Join(gaps, ", "))
+ }
+
+ // Check if we consumed just a few messages before stopping
+ if msgCount <= 10 {
+ log.Printf("Consumer %d: CRITICAL - Messages() channel CLOSED early on %s[%d] after only %d messages at offset=%d (HWM=%d, gaps=%d %s)",
+ h.consumer.id, topic, partition, msgCount, lastTrackedOffset, claim.HighWaterMarkOffset()-1, gapCount, gapSummary)
+ } else {
+ log.Printf("Consumer %d: STOP consuming %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, HWM=%d, gaps=%d %s)",
+ h.consumer.id, topic, partition, msgCount, elapsed.Seconds(),
+ float64(msgCount)/elapsed.Seconds(), lastTrackedOffset, claim.HighWaterMarkOffset()-1, gapCount, gapSummary)
+ }
return nil
}
msgCount++
+ // Track gaps in offset sequence (indicates missed messages)
+ if lastTrackedOffset >= 0 && message.Offset != lastTrackedOffset+1 {
+ gap := message.Offset - lastTrackedOffset - 1
+ gapCount++
+ gapDesc := fmt.Sprintf("%d-%d", lastTrackedOffset+1, message.Offset-1)
+ gaps = append(gaps, gapDesc)
+ elapsed := time.Since(startTime)
+ log.Printf("Consumer %d: DEBUG offset gap in %s[%d] at %.1fs: offset %d -> %d (gap=%d messages, gapDesc=%s)",
+ h.consumer.id, topic, partition, elapsed.Seconds(), lastTrackedOffset, message.Offset, gap, gapDesc)
+ }
+ lastTrackedOffset = message.Offset
+
+ // Log progress every 500 messages OR every 5 seconds
+ now := time.Now()
+ if msgCount%500 == 0 || now.Sub(lastLogTime) > 5*time.Second {
+ elapsed := time.Since(startTime)
+ throughput := float64(msgCount) / elapsed.Seconds()
+ log.Printf("Consumer %d: %s[%d] progress: %d messages, offset=%d, HWM=%d, rate=%.1f msgs/sec, gaps=%d",
+ h.consumer.id, topic, partition, msgCount, message.Offset, claim.HighWaterMarkOffset(), throughput, gapCount)
+ lastLogTime = now
+ }
+
// Process the message
var key []byte
if message.Key != nil {
@@ -589,24 +690,72 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
}
if err := h.consumer.processMessage(&message.Topic, message.Partition, message.Offset, key, message.Value); err != nil {
- log.Printf("Consumer %d: Error processing message: %v", h.consumer.id, err)
+ log.Printf("Consumer %d: Error processing message at %s[%d]@%d: %v",
+ h.consumer.id, message.Topic, message.Partition, message.Offset, err)
h.consumer.metricsCollector.RecordConsumerError()
-
- // Add a small delay for schema validation or other processing errors to avoid overloading
- // select {
- // case <-time.After(100 * time.Millisecond):
- // // Continue after brief delay
- // case <-session.Context().Done():
- // return nil
- // }
} else {
+ // Track consumed message
+ if h.consumer.tracker != nil {
+ h.consumer.tracker.TrackConsumed(tracker.Record{
+ Key: string(key),
+ Topic: message.Topic,
+ Partition: message.Partition,
+ Offset: message.Offset,
+ Timestamp: message.Timestamp.UnixNano(),
+ ConsumerID: h.consumer.id,
+ })
+ }
+
// Mark message as processed
session.MarkMessage(message, "")
+
+ // Commit offset frequently to minimize both message loss and duplicates
+ // Every 20 messages balances:
+ // - ~600 commits per 12k messages (reasonable overhead)
+ // - ~20 message loss window if consumer fails
+ // - Reduces duplicate reads from rebalancing
+ if msgCount%20 == 0 {
+ session.Commit()
+ }
}
case <-session.Context().Done():
- log.Printf("Consumer %d: Session context cancelled for %s[%d]",
- h.consumer.id, claim.Topic(), claim.Partition())
+ elapsed := time.Since(startTime)
+ lastOffset := claim.HighWaterMarkOffset() - 1
+ gapSummary := "none"
+ if len(gaps) > 0 {
+ gapSummary = fmt.Sprintf("[%s]", strings.Join(gaps, ", "))
+ }
+
+ // Determine if we reached HWM
+ reachedHWM := lastTrackedOffset >= lastOffset
+ hwmStatus := "INCOMPLETE"
+ if reachedHWM {
+ hwmStatus := "COMPLETE"
+ _ = hwmStatus // Use it to avoid warning
+ }
+
+ // Calculate consumption rate for this partition
+ consumptionRate := float64(0)
+ if elapsed.Seconds() > 0 {
+ consumptionRate = float64(msgCount) / elapsed.Seconds()
+ }
+
+ // Log both normal and abnormal completions
+ if msgCount == 0 {
+ // Partition never got ANY messages - critical issue
+ log.Printf("Consumer %d: CRITICAL - NO MESSAGES from %s[%d] (HWM=%d, status=%s)",
+ h.consumer.id, topic, partition, claim.HighWaterMarkOffset()-1, hwmStatus)
+ } else if msgCount < 10 && msgCount > 0 {
+ // Very few messages then stopped - likely hung fetch
+ log.Printf("Consumer %d: HUNG FETCH on %s[%d]: only %d messages before stop at offset=%d (HWM=%d, rate=%.2f msgs/sec, gaps=%d %s)",
+ h.consumer.id, topic, partition, msgCount, lastTrackedOffset, claim.HighWaterMarkOffset()-1, consumptionRate, gapCount, gapSummary)
+ } else {
+ // Normal completion
+ log.Printf("Consumer %d: Context CANCELLED for %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, HWM=%d, status=%s, gaps=%d %s)",
+ h.consumer.id, topic, partition, msgCount, elapsed.Seconds(),
+ consumptionRate, lastTrackedOffset, claim.HighWaterMarkOffset()-1, hwmStatus, gapCount, gapSummary)
+ }
return nil
}
}