diff options
Diffstat (limited to 'test/kafka/kafka-client-loadtest/internal/consumer/consumer.go')
| -rw-r--r-- | test/kafka/kafka-client-loadtest/internal/consumer/consumer.go | 179 |
1 files changed, 164 insertions, 15 deletions
diff --git a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go index 1171bd5c0..6b23fdfe9 100644 --- a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go +++ b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go @@ -6,6 +6,8 @@ import ( "encoding/json" "fmt" "log" + "os" + "strings" "sync" "time" @@ -14,6 +16,7 @@ import ( "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/config" "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/metrics" pb "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema/pb" + "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/tracker" "google.golang.org/protobuf/proto" ) @@ -35,10 +38,13 @@ type Consumer struct { messagesProcessed int64 lastOffset map[string]map[int32]int64 offsetMutex sync.RWMutex + + // Record tracking + tracker *tracker.Tracker } // New creates a new consumer instance -func New(cfg *config.Config, collector *metrics.Collector, id int) (*Consumer, error) { +func New(cfg *config.Config, collector *metrics.Collector, id int, recordTracker *tracker.Tracker) (*Consumer, error) { // All consumers share the same group for load balancing across partitions consumerGroup := cfg.Consumers.GroupPrefix @@ -51,6 +57,7 @@ func New(cfg *config.Config, collector *metrics.Collector, id int) (*Consumer, e useConfluent: false, // Use Sarama by default lastOffset: make(map[string]map[int32]int64), schemaFormats: make(map[string]string), + tracker: recordTracker, } // Initialize schema formats for each topic (must match producer logic) @@ -101,6 +108,9 @@ func New(cfg *config.Config, collector *metrics.Collector, id int) (*Consumer, e func (c *Consumer) initSaramaConsumer() error { config := sarama.NewConfig() + // Enable Sarama debug logging to diagnose connection issues + sarama.Logger = log.New(os.Stdout, fmt.Sprintf("[Sarama Consumer %d] ", c.id), log.LstdFlags) + // Consumer configuration config.Consumer.Return.Errors = true config.Consumer.Offsets.Initial = sarama.OffsetOldest @@ -130,9 +140,24 @@ func (c *Consumer) initSaramaConsumer() error { // This allows Sarama to fetch from multiple partitions in parallel config.Net.MaxOpenRequests = 20 // Increase from default 5 to allow 20 concurrent requests + // Connection retry and timeout configuration + config.Net.DialTimeout = 30 * time.Second // Increase from default 30s + config.Net.ReadTimeout = 30 * time.Second // Increase from default 30s + config.Net.WriteTimeout = 30 * time.Second // Increase from default 30s + config.Metadata.Retry.Max = 5 // Retry metadata fetch up to 5 times + config.Metadata.Retry.Backoff = 500 * time.Millisecond + config.Metadata.Timeout = 30 * time.Second // Increase metadata timeout + // Version config.Version = sarama.V2_8_0_0 + // CRITICAL: Set unique ClientID to ensure each consumer gets a unique member ID + // Without this, all consumers from the same process get the same member ID and only 1 joins! + // Sarama uses ClientID as part of the member ID generation + // Use consumer ID directly - no timestamp needed since IDs are already unique per process + config.ClientID = fmt.Sprintf("loadtest-consumer-%d", c.id) + log.Printf("Consumer %d: Setting Sarama ClientID to: %s", c.id, config.ClientID) + // Create consumer group consumerGroup, err := sarama.NewConsumerGroup(c.config.Kafka.BootstrapServers, c.consumerGroup, config) if err != nil { @@ -560,28 +585,104 @@ type ConsumerGroupHandler struct { } // Setup is run at the beginning of a new session, before ConsumeClaim -func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error { +func (h *ConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error { log.Printf("Consumer %d: Consumer group session setup", h.consumer.id) + + // Log the generation ID and member ID for this session + log.Printf("Consumer %d: Generation=%d, MemberID=%s", + h.consumer.id, session.GenerationID(), session.MemberID()) + + // Log all assigned partitions and their starting offsets + assignments := session.Claims() + totalPartitions := 0 + for topic, partitions := range assignments { + for _, partition := range partitions { + totalPartitions++ + log.Printf("Consumer %d: ASSIGNED %s[%d]", + h.consumer.id, topic, partition) + } + } + log.Printf("Consumer %d: Total partitions assigned: %d", h.consumer.id, totalPartitions) return nil } // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited -func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error { - log.Printf("Consumer %d: Consumer group session cleanup", h.consumer.id) +// CRITICAL: Commit all marked offsets before partition reassignment to minimize duplicates +func (h *ConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error { + log.Printf("Consumer %d: Consumer group session cleanup - committing final offsets before rebalance", h.consumer.id) + + // Commit all marked offsets before releasing partitions + // This ensures that when partitions are reassigned to other consumers, + // they start from the last processed offset, minimizing duplicate reads + session.Commit() + + log.Printf("Consumer %d: Cleanup complete - offsets committed", h.consumer.id) return nil } // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages() func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { msgCount := 0 + topic := claim.Topic() + partition := claim.Partition() + initialOffset := claim.InitialOffset() + lastTrackedOffset := int64(-1) + gapCount := 0 + var gaps []string // Track gap ranges for detailed analysis + + // Log the starting offset for this partition + log.Printf("Consumer %d: START consuming %s[%d] from offset %d (HWM=%d)", + h.consumer.id, topic, partition, initialOffset, claim.HighWaterMarkOffset()) + + startTime := time.Now() + lastLogTime := time.Now() + for { select { case message, ok := <-claim.Messages(): if !ok { + elapsed := time.Since(startTime) + // Log detailed gap analysis + gapSummary := "none" + if len(gaps) > 0 { + gapSummary = fmt.Sprintf("[%s]", strings.Join(gaps, ", ")) + } + + // Check if we consumed just a few messages before stopping + if msgCount <= 10 { + log.Printf("Consumer %d: CRITICAL - Messages() channel CLOSED early on %s[%d] after only %d messages at offset=%d (HWM=%d, gaps=%d %s)", + h.consumer.id, topic, partition, msgCount, lastTrackedOffset, claim.HighWaterMarkOffset()-1, gapCount, gapSummary) + } else { + log.Printf("Consumer %d: STOP consuming %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, HWM=%d, gaps=%d %s)", + h.consumer.id, topic, partition, msgCount, elapsed.Seconds(), + float64(msgCount)/elapsed.Seconds(), lastTrackedOffset, claim.HighWaterMarkOffset()-1, gapCount, gapSummary) + } return nil } msgCount++ + // Track gaps in offset sequence (indicates missed messages) + if lastTrackedOffset >= 0 && message.Offset != lastTrackedOffset+1 { + gap := message.Offset - lastTrackedOffset - 1 + gapCount++ + gapDesc := fmt.Sprintf("%d-%d", lastTrackedOffset+1, message.Offset-1) + gaps = append(gaps, gapDesc) + elapsed := time.Since(startTime) + log.Printf("Consumer %d: DEBUG offset gap in %s[%d] at %.1fs: offset %d -> %d (gap=%d messages, gapDesc=%s)", + h.consumer.id, topic, partition, elapsed.Seconds(), lastTrackedOffset, message.Offset, gap, gapDesc) + } + lastTrackedOffset = message.Offset + + // Log progress every 500 messages OR every 5 seconds + now := time.Now() + if msgCount%500 == 0 || now.Sub(lastLogTime) > 5*time.Second { + elapsed := time.Since(startTime) + throughput := float64(msgCount) / elapsed.Seconds() + log.Printf("Consumer %d: %s[%d] progress: %d messages, offset=%d, HWM=%d, rate=%.1f msgs/sec, gaps=%d", + h.consumer.id, topic, partition, msgCount, message.Offset, claim.HighWaterMarkOffset(), throughput, gapCount) + lastLogTime = now + } + // Process the message var key []byte if message.Key != nil { @@ -589,24 +690,72 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, } if err := h.consumer.processMessage(&message.Topic, message.Partition, message.Offset, key, message.Value); err != nil { - log.Printf("Consumer %d: Error processing message: %v", h.consumer.id, err) + log.Printf("Consumer %d: Error processing message at %s[%d]@%d: %v", + h.consumer.id, message.Topic, message.Partition, message.Offset, err) h.consumer.metricsCollector.RecordConsumerError() - - // Add a small delay for schema validation or other processing errors to avoid overloading - // select { - // case <-time.After(100 * time.Millisecond): - // // Continue after brief delay - // case <-session.Context().Done(): - // return nil - // } } else { + // Track consumed message + if h.consumer.tracker != nil { + h.consumer.tracker.TrackConsumed(tracker.Record{ + Key: string(key), + Topic: message.Topic, + Partition: message.Partition, + Offset: message.Offset, + Timestamp: message.Timestamp.UnixNano(), + ConsumerID: h.consumer.id, + }) + } + // Mark message as processed session.MarkMessage(message, "") + + // Commit offset frequently to minimize both message loss and duplicates + // Every 20 messages balances: + // - ~600 commits per 12k messages (reasonable overhead) + // - ~20 message loss window if consumer fails + // - Reduces duplicate reads from rebalancing + if msgCount%20 == 0 { + session.Commit() + } } case <-session.Context().Done(): - log.Printf("Consumer %d: Session context cancelled for %s[%d]", - h.consumer.id, claim.Topic(), claim.Partition()) + elapsed := time.Since(startTime) + lastOffset := claim.HighWaterMarkOffset() - 1 + gapSummary := "none" + if len(gaps) > 0 { + gapSummary = fmt.Sprintf("[%s]", strings.Join(gaps, ", ")) + } + + // Determine if we reached HWM + reachedHWM := lastTrackedOffset >= lastOffset + hwmStatus := "INCOMPLETE" + if reachedHWM { + hwmStatus := "COMPLETE" + _ = hwmStatus // Use it to avoid warning + } + + // Calculate consumption rate for this partition + consumptionRate := float64(0) + if elapsed.Seconds() > 0 { + consumptionRate = float64(msgCount) / elapsed.Seconds() + } + + // Log both normal and abnormal completions + if msgCount == 0 { + // Partition never got ANY messages - critical issue + log.Printf("Consumer %d: CRITICAL - NO MESSAGES from %s[%d] (HWM=%d, status=%s)", + h.consumer.id, topic, partition, claim.HighWaterMarkOffset()-1, hwmStatus) + } else if msgCount < 10 && msgCount > 0 { + // Very few messages then stopped - likely hung fetch + log.Printf("Consumer %d: HUNG FETCH on %s[%d]: only %d messages before stop at offset=%d (HWM=%d, rate=%.2f msgs/sec, gaps=%d %s)", + h.consumer.id, topic, partition, msgCount, lastTrackedOffset, claim.HighWaterMarkOffset()-1, consumptionRate, gapCount, gapSummary) + } else { + // Normal completion + log.Printf("Consumer %d: Context CANCELLED for %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, HWM=%d, status=%s, gaps=%d %s)", + h.consumer.id, topic, partition, msgCount, elapsed.Seconds(), + consumptionRate, lastTrackedOffset, claim.HighWaterMarkOffset()-1, hwmStatus, gapCount, gapSummary) + } return nil } } |
