diff options
Diffstat (limited to 'weed/mq/kafka/protocol/joingroup.go')
| -rw-r--r-- | weed/mq/kafka/protocol/joingroup.go | 113 |
1 files changed, 73 insertions, 40 deletions
diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go index 27d8d8811..85a632070 100644 --- a/weed/mq/kafka/protocol/joingroup.go +++ b/weed/mq/kafka/protocol/joingroup.go @@ -7,6 +7,7 @@ import ( "sort" "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer" ) @@ -82,6 +83,16 @@ func (h *Handler) handleJoinGroup(connContext *ConnectionContext, correlationID var isNewMember bool var existingMember *consumer.GroupMember + // Use the actual ClientID from Kafka protocol header for unique member ID generation + clientKey := connContext.ClientID + if clientKey == "" { + // Fallback to deterministic key if ClientID not available + clientKey = fmt.Sprintf("%s-%d-%s", request.GroupID, request.SessionTimeout, request.ProtocolType) + glog.Warningf("[JoinGroup] No ClientID in ConnectionContext for group %s, using fallback: %s", request.GroupID, clientKey) + } else { + glog.V(1).Infof("[JoinGroup] Using ClientID from ConnectionContext for group %s: %s", request.GroupID, clientKey) + } + // Check for static membership first if request.GroupInstanceID != "" { existingMember = h.groupCoordinator.FindStaticMemberLocked(group, request.GroupInstanceID) @@ -95,8 +106,6 @@ func (h *Handler) handleJoinGroup(connContext *ConnectionContext, correlationID } } else { // Dynamic membership logic - clientKey := fmt.Sprintf("%s-%d-%s", request.GroupID, request.SessionTimeout, request.ProtocolType) - if request.MemberID == "" { // New member - check if we already have a member for this client var existingMemberID string @@ -155,12 +164,9 @@ func (h *Handler) handleJoinGroup(connContext *ConnectionContext, correlationID groupInstanceID = &request.GroupInstanceID } - // Use deterministic client identifier based on group + session timeout + protocol - clientKey := fmt.Sprintf("%s-%d-%s", request.GroupID, request.SessionTimeout, request.ProtocolType) - member := &consumer.GroupMember{ ID: memberID, - ClientID: clientKey, // Use deterministic client key for member identification + ClientID: clientKey, // Use actual Kafka ClientID for unique member identification ClientHost: clientHost, // Now extracted from actual connection GroupInstanceID: groupInstanceID, SessionTimeout: request.SessionTimeout, @@ -231,7 +237,7 @@ func (h *Handler) handleJoinGroup(connContext *ConnectionContext, correlationID // Ensure we have a valid protocol - fallback to "range" if empty if groupProtocol == "" { - groupProtocol = "range" + groupProtocol = consumer.ProtocolNameRange } // If a protocol is already selected for the group, reject joins that do not support it. @@ -266,8 +272,6 @@ func (h *Handler) handleJoinGroup(connContext *ConnectionContext, correlationID Version: apiVersion, } - // Debug logging for JoinGroup response - // If this member is the leader, include all member info for assignment if memberID == group.Leader { response.Members = make([]JoinGroupMember, 0, len(group.Members)) @@ -310,7 +314,7 @@ func (h *Handler) parseJoinGroupRequest(data []byte, apiVersion uint16) (*JoinGr var groupID string if isFlexible { // Flexible protocol uses compact strings - endIdx := offset + 20 // Show more bytes for debugging + endIdx := offset + 20 if endIdx > len(data) { endIdx = len(data) } @@ -571,8 +575,6 @@ func (h *Handler) parseJoinGroupRequest(data []byte, apiVersion uint16) (*JoinGr } func (h *Handler) buildJoinGroupResponse(response JoinGroupResponse) []byte { - // Debug logging for JoinGroup response - // Flexible response for v6+ if IsFlexibleVersion(11, response.Version) { out := make([]byte, 0, 256) @@ -614,7 +616,7 @@ func (h *Handler) buildJoinGroupResponse(response JoinGroupResponse) []byte { } else { // NON-nullable compact string in v6 - must not be empty! if response.ProtocolName == "" { - response.ProtocolName = "range" // fallback to default + response.ProtocolName = consumer.ProtocolNameRange // fallback to default } out = append(out, FlexibleString(response.ProtocolName)...) } @@ -761,9 +763,9 @@ func (h *Handler) buildJoinGroupErrorResponse(correlationID uint32, errorCode in ThrottleTimeMs: 0, ErrorCode: errorCode, GenerationID: -1, - ProtocolName: "range", // Use "range" as default protocol instead of empty string - Leader: "unknown", // Use "unknown" instead of empty string for non-nullable field - MemberID: "unknown", // Use "unknown" instead of empty string for non-nullable field + ProtocolName: consumer.ProtocolNameRange, // Use "range" as default protocol instead of empty string + Leader: "unknown", // Use "unknown" instead of empty string for non-nullable field + MemberID: "unknown", // Use "unknown" instead of empty string for non-nullable field Version: apiVersion, Members: []JoinGroupMember{}, } @@ -773,7 +775,6 @@ func (h *Handler) buildJoinGroupErrorResponse(correlationID uint32, errorCode in // extractSubscriptionFromProtocolsEnhanced uses improved metadata parsing with better error handling func (h *Handler) extractSubscriptionFromProtocolsEnhanced(protocols []GroupProtocol) []string { - // Analyze protocol metadata for debugging debugInfo := AnalyzeProtocolMetadata(protocols) for _, info := range debugInfo { if info.ParsedOK { @@ -862,10 +863,16 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque } // Check if this is the group leader with assignments + glog.V(2).Infof("[SYNCGROUP] Member=%s Leader=%s GroupState=%s HasAssignments=%v MemberCount=%d Gen=%d", + request.MemberID, group.Leader, group.State, len(request.GroupAssignments) > 0, len(group.Members), request.GenerationID) + if request.MemberID == group.Leader && len(request.GroupAssignments) > 0 { // Leader is providing assignments - process and store them + glog.V(2).Infof("[SYNCGROUP] Leader %s providing client-side assignments for group %s (%d assignments)", + request.MemberID, request.GroupID, len(request.GroupAssignments)) err = h.processGroupAssignments(group, request.GroupAssignments) if err != nil { + glog.Errorf("[SYNCGROUP] ERROR processing leader assignments: %v", err) return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInconsistentGroupProtocol, apiVersion), nil } @@ -876,11 +883,19 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque for _, m := range group.Members { m.State = consumer.MemberStateStable } - } else if group.State == consumer.GroupStateCompletingRebalance { - // Non-leader member waiting for assignments - // Assignments should already be processed by leader + glog.V(2).Infof("[SYNCGROUP] Leader assignments processed successfully, group now STABLE") + } else if request.MemberID != group.Leader && len(request.GroupAssignments) == 0 { + // Non-leader member requesting its assignment + // CRITICAL FIX: Non-leader members should ALWAYS wait for leader's client-side assignments + // This is the correct behavior for Sarama and other client-side assignment protocols + glog.V(3).Infof("[SYNCGROUP] Non-leader %s waiting for/retrieving assignment in group %s (state=%s)", + request.MemberID, request.GroupID, group.State) + // Assignment will be retrieved from member.Assignment below } else { - // Trigger partition assignment using built-in strategy + // Trigger partition assignment using built-in strategy (server-side assignment) + // This should only happen for server-side assignment protocols (not Sarama's client-side) + glog.Warningf("[SYNCGROUP] Using server-side assignment for group %s (Leader=%s State=%s) - this should not happen with Sarama!", + request.GroupID, group.Leader, group.State) topicPartitions := h.getTopicPartitions(group) group.AssignPartitions(topicPartitions) @@ -901,6 +916,10 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque assignment = h.serializeMemberAssignment(member.Assignment) } + // Log member assignment details + glog.V(3).Infof("[SYNCGROUP] Member %s in group %s assigned %d partitions: %v", + request.MemberID, request.GroupID, len(member.Assignment), member.Assignment) + // Build response response := SyncGroupResponse{ CorrelationID: correlationID, @@ -908,7 +927,6 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque Assignment: assignment, } - // Log assignment details for debugging assignmentPreview := assignment if len(assignmentPreview) > 100 { assignmentPreview = assignment[:100] @@ -1092,7 +1110,7 @@ func (h *Handler) parseSyncGroupRequest(data []byte, apiVersion uint16) (*SyncGr offset += int(assignLength) } - // CRITICAL FIX: Flexible format requires tagged fields after each assignment struct + // Flexible format requires tagged fields after each assignment struct if offset < len(data) { _, taggedConsumed, tagErr := DecodeTaggedFields(data[offset:]) if tagErr == nil { @@ -1171,7 +1189,7 @@ func (h *Handler) buildSyncGroupResponse(response SyncGroupResponse, apiVersion // Assignment - FLEXIBLE V4+ FIX if IsFlexibleVersion(14, apiVersion) { // FLEXIBLE FORMAT: Assignment as compact bytes - // CRITICAL FIX: Use CompactStringLength for compact bytes (not CompactArrayLength) + // Use CompactStringLength for compact bytes (not CompactArrayLength) // Compact bytes use the same encoding as compact strings: 0 = null, 1 = empty, n+1 = length n assignmentLen := len(response.Assignment) if assignmentLen == 0 { @@ -1209,6 +1227,8 @@ func (h *Handler) buildSyncGroupErrorResponse(correlationID uint32, errorCode in func (h *Handler) processGroupAssignments(group *consumer.ConsumerGroup, assignments []GroupAssignment) error { // Apply leader-provided assignments + glog.V(2).Infof("[PROCESS_ASSIGNMENTS] Processing %d member assignments from leader", len(assignments)) + // Clear current assignments for _, m := range group.Members { m.Assignment = nil @@ -1218,14 +1238,17 @@ func (h *Handler) processGroupAssignments(group *consumer.ConsumerGroup, assignm m, ok := group.Members[ga.MemberID] if !ok { // Skip unknown members + glog.V(1).Infof("[PROCESS_ASSIGNMENTS] Skipping unknown member: %s", ga.MemberID) continue } parsed, err := h.parseMemberAssignment(ga.Assignment) if err != nil { + glog.Errorf("[PROCESS_ASSIGNMENTS] Failed to parse assignment for member %s: %v", ga.MemberID, err) return err } m.Assignment = parsed + glog.V(3).Infof("[PROCESS_ASSIGNMENTS] Member %s assigned %d partitions: %v", ga.MemberID, len(parsed), parsed) } return nil @@ -1304,16 +1327,19 @@ func (h *Handler) getTopicPartitions(group *consumer.ConsumerGroup) map[string][ // Get partition info for all subscribed topics for topic := range group.SubscribedTopics { - // Check if topic exists using SeaweedMQ handler - if h.seaweedMQHandler.TopicExists(topic) { - // For now, assume 1 partition per topic (can be extended later) - // In a real implementation, this would query SeaweedMQ for actual partition count - partitions := []int32{0} - topicPartitions[topic] = partitions - } else { - // Default to single partition if topic not found - topicPartitions[topic] = []int32{0} + // Get actual partition count from topic info + topicInfo, exists := h.seaweedMQHandler.GetTopicInfo(topic) + partitionCount := h.GetDefaultPartitions() // Use configurable default + if exists && topicInfo != nil { + partitionCount = topicInfo.Partitions } + + // Create partition list: [0, 1, 2, ...] + partitions := make([]int32, partitionCount) + for i := int32(0); i < partitionCount; i++ { + partitions[i] = i + } + topicPartitions[topic] = partitions } return topicPartitions @@ -1323,13 +1349,15 @@ func (h *Handler) serializeSchemaRegistryAssignment(group *consumer.ConsumerGrou // Schema Registry expects a JSON assignment in the format: // {"error":0,"master":"member-id","master_identity":{"host":"localhost","port":8081,"master_eligibility":true,"scheme":"http","version":"7.4.0-ce"}} - // CRITICAL FIX: Extract the actual leader's identity from the leader's metadata + // Extract the actual leader's identity from the leader's metadata // to avoid localhost/hostname mismatch that causes Schema Registry to forward // requests to itself leaderMember, exists := group.Members[group.Leader] if !exists { - // Fallback if leader not found (shouldn't happen) - jsonAssignment := `{"error":0,"master":"","master_identity":{"host":"localhost","port":8081,"master_eligibility":true,"scheme":"http","version":1}}` + // Leader not found - return minimal assignment with no master identity + // Schema Registry should handle this by failing over to another instance + glog.Warningf("Schema Registry leader member %s not found in group %s", group.Leader, group.ID) + jsonAssignment := `{"error":0,"master":"","master_identity":{"host":"","port":0,"master_eligibility":false,"scheme":"http","version":1}}` return []byte(jsonAssignment) } @@ -1338,13 +1366,16 @@ func (h *Handler) serializeSchemaRegistryAssignment(group *consumer.ConsumerGrou var identity map[string]interface{} err := json.Unmarshal(leaderMember.Metadata, &identity) if err != nil { - // Fallback to basic assignment - jsonAssignment := fmt.Sprintf(`{"error":0,"master":"%s","master_identity":{"host":"localhost","port":8081,"master_eligibility":true,"scheme":"http","version":1}}`, group.Leader) + // Failed to parse metadata - return minimal assignment + // Schema Registry should provide valid metadata; if not, fail gracefully + glog.Warningf("Failed to parse Schema Registry metadata for leader %s: %v", group.Leader, err) + jsonAssignment := fmt.Sprintf(`{"error":0,"master":"%s","master_identity":{"host":"","port":0,"master_eligibility":false,"scheme":"http","version":1}}`, group.Leader) return []byte(jsonAssignment) } - // Extract fields with defaults - host := "localhost" + // Extract fields from identity - use empty/zero defaults if missing + // Schema Registry clients should provide complete metadata + host := "" port := 8081 scheme := "http" version := 1 @@ -1352,6 +1383,8 @@ func (h *Handler) serializeSchemaRegistryAssignment(group *consumer.ConsumerGrou if h, ok := identity["host"].(string); ok { host = h + } else { + glog.V(1).Infof("Schema Registry metadata missing 'host' field for leader %s", group.Leader) } if p, ok := identity["port"].(float64); ok { port = int(p) |
