aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/protocol/joingroup.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/protocol/joingroup.go')
-rw-r--r--weed/mq/kafka/protocol/joingroup.go113
1 files changed, 73 insertions, 40 deletions
diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go
index 27d8d8811..85a632070 100644
--- a/weed/mq/kafka/protocol/joingroup.go
+++ b/weed/mq/kafka/protocol/joingroup.go
@@ -7,6 +7,7 @@ import (
"sort"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
)
@@ -82,6 +83,16 @@ func (h *Handler) handleJoinGroup(connContext *ConnectionContext, correlationID
var isNewMember bool
var existingMember *consumer.GroupMember
+ // Use the actual ClientID from Kafka protocol header for unique member ID generation
+ clientKey := connContext.ClientID
+ if clientKey == "" {
+ // Fallback to deterministic key if ClientID not available
+ clientKey = fmt.Sprintf("%s-%d-%s", request.GroupID, request.SessionTimeout, request.ProtocolType)
+ glog.Warningf("[JoinGroup] No ClientID in ConnectionContext for group %s, using fallback: %s", request.GroupID, clientKey)
+ } else {
+ glog.V(1).Infof("[JoinGroup] Using ClientID from ConnectionContext for group %s: %s", request.GroupID, clientKey)
+ }
+
// Check for static membership first
if request.GroupInstanceID != "" {
existingMember = h.groupCoordinator.FindStaticMemberLocked(group, request.GroupInstanceID)
@@ -95,8 +106,6 @@ func (h *Handler) handleJoinGroup(connContext *ConnectionContext, correlationID
}
} else {
// Dynamic membership logic
- clientKey := fmt.Sprintf("%s-%d-%s", request.GroupID, request.SessionTimeout, request.ProtocolType)
-
if request.MemberID == "" {
// New member - check if we already have a member for this client
var existingMemberID string
@@ -155,12 +164,9 @@ func (h *Handler) handleJoinGroup(connContext *ConnectionContext, correlationID
groupInstanceID = &request.GroupInstanceID
}
- // Use deterministic client identifier based on group + session timeout + protocol
- clientKey := fmt.Sprintf("%s-%d-%s", request.GroupID, request.SessionTimeout, request.ProtocolType)
-
member := &consumer.GroupMember{
ID: memberID,
- ClientID: clientKey, // Use deterministic client key for member identification
+ ClientID: clientKey, // Use actual Kafka ClientID for unique member identification
ClientHost: clientHost, // Now extracted from actual connection
GroupInstanceID: groupInstanceID,
SessionTimeout: request.SessionTimeout,
@@ -231,7 +237,7 @@ func (h *Handler) handleJoinGroup(connContext *ConnectionContext, correlationID
// Ensure we have a valid protocol - fallback to "range" if empty
if groupProtocol == "" {
- groupProtocol = "range"
+ groupProtocol = consumer.ProtocolNameRange
}
// If a protocol is already selected for the group, reject joins that do not support it.
@@ -266,8 +272,6 @@ func (h *Handler) handleJoinGroup(connContext *ConnectionContext, correlationID
Version: apiVersion,
}
- // Debug logging for JoinGroup response
-
// If this member is the leader, include all member info for assignment
if memberID == group.Leader {
response.Members = make([]JoinGroupMember, 0, len(group.Members))
@@ -310,7 +314,7 @@ func (h *Handler) parseJoinGroupRequest(data []byte, apiVersion uint16) (*JoinGr
var groupID string
if isFlexible {
// Flexible protocol uses compact strings
- endIdx := offset + 20 // Show more bytes for debugging
+ endIdx := offset + 20
if endIdx > len(data) {
endIdx = len(data)
}
@@ -571,8 +575,6 @@ func (h *Handler) parseJoinGroupRequest(data []byte, apiVersion uint16) (*JoinGr
}
func (h *Handler) buildJoinGroupResponse(response JoinGroupResponse) []byte {
- // Debug logging for JoinGroup response
-
// Flexible response for v6+
if IsFlexibleVersion(11, response.Version) {
out := make([]byte, 0, 256)
@@ -614,7 +616,7 @@ func (h *Handler) buildJoinGroupResponse(response JoinGroupResponse) []byte {
} else {
// NON-nullable compact string in v6 - must not be empty!
if response.ProtocolName == "" {
- response.ProtocolName = "range" // fallback to default
+ response.ProtocolName = consumer.ProtocolNameRange // fallback to default
}
out = append(out, FlexibleString(response.ProtocolName)...)
}
@@ -761,9 +763,9 @@ func (h *Handler) buildJoinGroupErrorResponse(correlationID uint32, errorCode in
ThrottleTimeMs: 0,
ErrorCode: errorCode,
GenerationID: -1,
- ProtocolName: "range", // Use "range" as default protocol instead of empty string
- Leader: "unknown", // Use "unknown" instead of empty string for non-nullable field
- MemberID: "unknown", // Use "unknown" instead of empty string for non-nullable field
+ ProtocolName: consumer.ProtocolNameRange, // Use "range" as default protocol instead of empty string
+ Leader: "unknown", // Use "unknown" instead of empty string for non-nullable field
+ MemberID: "unknown", // Use "unknown" instead of empty string for non-nullable field
Version: apiVersion,
Members: []JoinGroupMember{},
}
@@ -773,7 +775,6 @@ func (h *Handler) buildJoinGroupErrorResponse(correlationID uint32, errorCode in
// extractSubscriptionFromProtocolsEnhanced uses improved metadata parsing with better error handling
func (h *Handler) extractSubscriptionFromProtocolsEnhanced(protocols []GroupProtocol) []string {
- // Analyze protocol metadata for debugging
debugInfo := AnalyzeProtocolMetadata(protocols)
for _, info := range debugInfo {
if info.ParsedOK {
@@ -862,10 +863,16 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque
}
// Check if this is the group leader with assignments
+ glog.V(2).Infof("[SYNCGROUP] Member=%s Leader=%s GroupState=%s HasAssignments=%v MemberCount=%d Gen=%d",
+ request.MemberID, group.Leader, group.State, len(request.GroupAssignments) > 0, len(group.Members), request.GenerationID)
+
if request.MemberID == group.Leader && len(request.GroupAssignments) > 0 {
// Leader is providing assignments - process and store them
+ glog.V(2).Infof("[SYNCGROUP] Leader %s providing client-side assignments for group %s (%d assignments)",
+ request.MemberID, request.GroupID, len(request.GroupAssignments))
err = h.processGroupAssignments(group, request.GroupAssignments)
if err != nil {
+ glog.Errorf("[SYNCGROUP] ERROR processing leader assignments: %v", err)
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInconsistentGroupProtocol, apiVersion), nil
}
@@ -876,11 +883,19 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque
for _, m := range group.Members {
m.State = consumer.MemberStateStable
}
- } else if group.State == consumer.GroupStateCompletingRebalance {
- // Non-leader member waiting for assignments
- // Assignments should already be processed by leader
+ glog.V(2).Infof("[SYNCGROUP] Leader assignments processed successfully, group now STABLE")
+ } else if request.MemberID != group.Leader && len(request.GroupAssignments) == 0 {
+ // Non-leader member requesting its assignment
+ // CRITICAL FIX: Non-leader members should ALWAYS wait for leader's client-side assignments
+ // This is the correct behavior for Sarama and other client-side assignment protocols
+ glog.V(3).Infof("[SYNCGROUP] Non-leader %s waiting for/retrieving assignment in group %s (state=%s)",
+ request.MemberID, request.GroupID, group.State)
+ // Assignment will be retrieved from member.Assignment below
} else {
- // Trigger partition assignment using built-in strategy
+ // Trigger partition assignment using built-in strategy (server-side assignment)
+ // This should only happen for server-side assignment protocols (not Sarama's client-side)
+ glog.Warningf("[SYNCGROUP] Using server-side assignment for group %s (Leader=%s State=%s) - this should not happen with Sarama!",
+ request.GroupID, group.Leader, group.State)
topicPartitions := h.getTopicPartitions(group)
group.AssignPartitions(topicPartitions)
@@ -901,6 +916,10 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque
assignment = h.serializeMemberAssignment(member.Assignment)
}
+ // Log member assignment details
+ glog.V(3).Infof("[SYNCGROUP] Member %s in group %s assigned %d partitions: %v",
+ request.MemberID, request.GroupID, len(member.Assignment), member.Assignment)
+
// Build response
response := SyncGroupResponse{
CorrelationID: correlationID,
@@ -908,7 +927,6 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque
Assignment: assignment,
}
- // Log assignment details for debugging
assignmentPreview := assignment
if len(assignmentPreview) > 100 {
assignmentPreview = assignment[:100]
@@ -1092,7 +1110,7 @@ func (h *Handler) parseSyncGroupRequest(data []byte, apiVersion uint16) (*SyncGr
offset += int(assignLength)
}
- // CRITICAL FIX: Flexible format requires tagged fields after each assignment struct
+ // Flexible format requires tagged fields after each assignment struct
if offset < len(data) {
_, taggedConsumed, tagErr := DecodeTaggedFields(data[offset:])
if tagErr == nil {
@@ -1171,7 +1189,7 @@ func (h *Handler) buildSyncGroupResponse(response SyncGroupResponse, apiVersion
// Assignment - FLEXIBLE V4+ FIX
if IsFlexibleVersion(14, apiVersion) {
// FLEXIBLE FORMAT: Assignment as compact bytes
- // CRITICAL FIX: Use CompactStringLength for compact bytes (not CompactArrayLength)
+ // Use CompactStringLength for compact bytes (not CompactArrayLength)
// Compact bytes use the same encoding as compact strings: 0 = null, 1 = empty, n+1 = length n
assignmentLen := len(response.Assignment)
if assignmentLen == 0 {
@@ -1209,6 +1227,8 @@ func (h *Handler) buildSyncGroupErrorResponse(correlationID uint32, errorCode in
func (h *Handler) processGroupAssignments(group *consumer.ConsumerGroup, assignments []GroupAssignment) error {
// Apply leader-provided assignments
+ glog.V(2).Infof("[PROCESS_ASSIGNMENTS] Processing %d member assignments from leader", len(assignments))
+
// Clear current assignments
for _, m := range group.Members {
m.Assignment = nil
@@ -1218,14 +1238,17 @@ func (h *Handler) processGroupAssignments(group *consumer.ConsumerGroup, assignm
m, ok := group.Members[ga.MemberID]
if !ok {
// Skip unknown members
+ glog.V(1).Infof("[PROCESS_ASSIGNMENTS] Skipping unknown member: %s", ga.MemberID)
continue
}
parsed, err := h.parseMemberAssignment(ga.Assignment)
if err != nil {
+ glog.Errorf("[PROCESS_ASSIGNMENTS] Failed to parse assignment for member %s: %v", ga.MemberID, err)
return err
}
m.Assignment = parsed
+ glog.V(3).Infof("[PROCESS_ASSIGNMENTS] Member %s assigned %d partitions: %v", ga.MemberID, len(parsed), parsed)
}
return nil
@@ -1304,16 +1327,19 @@ func (h *Handler) getTopicPartitions(group *consumer.ConsumerGroup) map[string][
// Get partition info for all subscribed topics
for topic := range group.SubscribedTopics {
- // Check if topic exists using SeaweedMQ handler
- if h.seaweedMQHandler.TopicExists(topic) {
- // For now, assume 1 partition per topic (can be extended later)
- // In a real implementation, this would query SeaweedMQ for actual partition count
- partitions := []int32{0}
- topicPartitions[topic] = partitions
- } else {
- // Default to single partition if topic not found
- topicPartitions[topic] = []int32{0}
+ // Get actual partition count from topic info
+ topicInfo, exists := h.seaweedMQHandler.GetTopicInfo(topic)
+ partitionCount := h.GetDefaultPartitions() // Use configurable default
+ if exists && topicInfo != nil {
+ partitionCount = topicInfo.Partitions
}
+
+ // Create partition list: [0, 1, 2, ...]
+ partitions := make([]int32, partitionCount)
+ for i := int32(0); i < partitionCount; i++ {
+ partitions[i] = i
+ }
+ topicPartitions[topic] = partitions
}
return topicPartitions
@@ -1323,13 +1349,15 @@ func (h *Handler) serializeSchemaRegistryAssignment(group *consumer.ConsumerGrou
// Schema Registry expects a JSON assignment in the format:
// {"error":0,"master":"member-id","master_identity":{"host":"localhost","port":8081,"master_eligibility":true,"scheme":"http","version":"7.4.0-ce"}}
- // CRITICAL FIX: Extract the actual leader's identity from the leader's metadata
+ // Extract the actual leader's identity from the leader's metadata
// to avoid localhost/hostname mismatch that causes Schema Registry to forward
// requests to itself
leaderMember, exists := group.Members[group.Leader]
if !exists {
- // Fallback if leader not found (shouldn't happen)
- jsonAssignment := `{"error":0,"master":"","master_identity":{"host":"localhost","port":8081,"master_eligibility":true,"scheme":"http","version":1}}`
+ // Leader not found - return minimal assignment with no master identity
+ // Schema Registry should handle this by failing over to another instance
+ glog.Warningf("Schema Registry leader member %s not found in group %s", group.Leader, group.ID)
+ jsonAssignment := `{"error":0,"master":"","master_identity":{"host":"","port":0,"master_eligibility":false,"scheme":"http","version":1}}`
return []byte(jsonAssignment)
}
@@ -1338,13 +1366,16 @@ func (h *Handler) serializeSchemaRegistryAssignment(group *consumer.ConsumerGrou
var identity map[string]interface{}
err := json.Unmarshal(leaderMember.Metadata, &identity)
if err != nil {
- // Fallback to basic assignment
- jsonAssignment := fmt.Sprintf(`{"error":0,"master":"%s","master_identity":{"host":"localhost","port":8081,"master_eligibility":true,"scheme":"http","version":1}}`, group.Leader)
+ // Failed to parse metadata - return minimal assignment
+ // Schema Registry should provide valid metadata; if not, fail gracefully
+ glog.Warningf("Failed to parse Schema Registry metadata for leader %s: %v", group.Leader, err)
+ jsonAssignment := fmt.Sprintf(`{"error":0,"master":"%s","master_identity":{"host":"","port":0,"master_eligibility":false,"scheme":"http","version":1}}`, group.Leader)
return []byte(jsonAssignment)
}
- // Extract fields with defaults
- host := "localhost"
+ // Extract fields from identity - use empty/zero defaults if missing
+ // Schema Registry clients should provide complete metadata
+ host := ""
port := 8081
scheme := "http"
version := 1
@@ -1352,6 +1383,8 @@ func (h *Handler) serializeSchemaRegistryAssignment(group *consumer.ConsumerGrou
if h, ok := identity["host"].(string); ok {
host = h
+ } else {
+ glog.V(1).Infof("Schema Registry metadata missing 'host' field for leader %s", group.Leader)
}
if p, ok := identity["port"].(float64); ok {
port = int(p)