aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/protocol/joingroup.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/protocol/joingroup.go')
-rw-r--r--weed/mq/kafka/protocol/joingroup.go1435
1 files changed, 1435 insertions, 0 deletions
diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go
new file mode 100644
index 000000000..27d8d8811
--- /dev/null
+++ b/weed/mq/kafka/protocol/joingroup.go
@@ -0,0 +1,1435 @@
+package protocol
+
+import (
+ "encoding/binary"
+ "encoding/json"
+ "fmt"
+ "sort"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
+)
+
+// JoinGroup API (key 11) - Consumer group protocol
+// Handles consumer joining a consumer group and initial coordination
+
+// JoinGroupRequest represents a JoinGroup request from a Kafka client
+type JoinGroupRequest struct {
+ GroupID string
+ SessionTimeout int32
+ RebalanceTimeout int32
+ MemberID string // Empty for new members
+ GroupInstanceID string // Optional static membership
+ ProtocolType string // "consumer" for regular consumers
+ GroupProtocols []GroupProtocol
+}
+
+// GroupProtocol represents a supported assignment protocol
+type GroupProtocol struct {
+ Name string
+ Metadata []byte
+}
+
+// JoinGroupResponse represents a JoinGroup response to a Kafka client
+type JoinGroupResponse struct {
+ CorrelationID uint32
+ ThrottleTimeMs int32 // versions 2+
+ ErrorCode int16
+ GenerationID int32
+ ProtocolName string // NOT nullable in v6, nullable in v7+
+ Leader string // NOT nullable
+ MemberID string
+ Version uint16
+ Members []JoinGroupMember // Only populated for group leader
+}
+
+// JoinGroupMember represents member info sent to group leader
+type JoinGroupMember struct {
+ MemberID string
+ GroupInstanceID string
+ Metadata []byte
+}
+
+// Error codes for JoinGroup are imported from errors.go
+
+func (h *Handler) handleJoinGroup(connContext *ConnectionContext, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
+ // Parse JoinGroup request
+ request, err := h.parseJoinGroupRequest(requestBody, apiVersion)
+ if err != nil {
+ return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil
+ }
+
+ // Validate request
+ if request.GroupID == "" {
+ return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil
+ }
+
+ if !h.groupCoordinator.ValidateSessionTimeout(request.SessionTimeout) {
+ return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidSessionTimeout, apiVersion), nil
+ }
+
+ // Get or create consumer group
+ group := h.groupCoordinator.GetOrCreateGroup(request.GroupID)
+
+ group.Mu.Lock()
+ defer group.Mu.Unlock()
+
+ // Update group's last activity
+ group.LastActivity = time.Now()
+
+ // Handle member ID logic with static membership support
+ var memberID string
+ var isNewMember bool
+ var existingMember *consumer.GroupMember
+
+ // Check for static membership first
+ if request.GroupInstanceID != "" {
+ existingMember = h.groupCoordinator.FindStaticMemberLocked(group, request.GroupInstanceID)
+ if existingMember != nil {
+ memberID = existingMember.ID
+ isNewMember = false
+ } else {
+ // New static member
+ memberID = h.groupCoordinator.GenerateMemberID(request.GroupInstanceID, "static")
+ isNewMember = true
+ }
+ } else {
+ // Dynamic membership logic
+ clientKey := fmt.Sprintf("%s-%d-%s", request.GroupID, request.SessionTimeout, request.ProtocolType)
+
+ if request.MemberID == "" {
+ // New member - check if we already have a member for this client
+ var existingMemberID string
+ for existingID, member := range group.Members {
+ if member.ClientID == clientKey && !h.groupCoordinator.IsStaticMember(member) {
+ existingMemberID = existingID
+ break
+ }
+ }
+
+ if existingMemberID != "" {
+ // Reuse existing member ID for this client
+ memberID = existingMemberID
+ isNewMember = false
+ } else {
+ // Generate new deterministic member ID
+ memberID = h.groupCoordinator.GenerateMemberID(clientKey, "consumer")
+ isNewMember = true
+ }
+ } else {
+ memberID = request.MemberID
+ // Check if member exists
+ if _, exists := group.Members[memberID]; !exists {
+ // Member ID provided but doesn't exist - reject
+ return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeUnknownMemberID, apiVersion), nil
+ }
+ isNewMember = false
+ }
+ }
+
+ // Check group state
+ switch group.State {
+ case consumer.GroupStateEmpty, consumer.GroupStateStable:
+ // Can join or trigger rebalance
+ if isNewMember || len(group.Members) == 0 {
+ group.State = consumer.GroupStatePreparingRebalance
+ group.Generation++
+ }
+ case consumer.GroupStatePreparingRebalance:
+ // Rebalance in progress - if this is the leader and we have members, transition to CompletingRebalance
+ if len(group.Members) > 0 && memberID == group.Leader {
+ group.State = consumer.GroupStateCompletingRebalance
+ }
+ case consumer.GroupStateCompletingRebalance:
+ // Allow join but don't change generation until SyncGroup
+ case consumer.GroupStateDead:
+ return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil
+ }
+
+ // Extract client host from connection context
+ clientHost := ExtractClientHost(connContext)
+
+ // Create or update member with enhanced metadata parsing
+ var groupInstanceID *string
+ if request.GroupInstanceID != "" {
+ groupInstanceID = &request.GroupInstanceID
+ }
+
+ // Use deterministic client identifier based on group + session timeout + protocol
+ clientKey := fmt.Sprintf("%s-%d-%s", request.GroupID, request.SessionTimeout, request.ProtocolType)
+
+ member := &consumer.GroupMember{
+ ID: memberID,
+ ClientID: clientKey, // Use deterministic client key for member identification
+ ClientHost: clientHost, // Now extracted from actual connection
+ GroupInstanceID: groupInstanceID,
+ SessionTimeout: request.SessionTimeout,
+ RebalanceTimeout: request.RebalanceTimeout,
+ Subscription: h.extractSubscriptionFromProtocolsEnhanced(request.GroupProtocols),
+ State: consumer.MemberStatePending,
+ LastHeartbeat: time.Now(),
+ JoinedAt: time.Now(),
+ }
+
+ // Add or update the member in the group before computing subscriptions or leader
+ if group.Members == nil {
+ group.Members = make(map[string]*consumer.GroupMember)
+ }
+ group.Members[memberID] = member
+
+ // Store consumer group and member ID in connection context for use in fetch requests
+ connContext.ConsumerGroup = request.GroupID
+ connContext.MemberID = memberID
+
+ // Store protocol metadata for leader
+ if len(request.GroupProtocols) > 0 {
+ if len(request.GroupProtocols[0].Metadata) == 0 {
+ // Generate subscription metadata for available topics
+ availableTopics := h.getAvailableTopics()
+
+ metadata := make([]byte, 0, 64)
+ // Version (2 bytes) - use version 0
+ metadata = append(metadata, 0, 0)
+ // Topics count (4 bytes)
+ topicsCount := make([]byte, 4)
+ binary.BigEndian.PutUint32(topicsCount, uint32(len(availableTopics)))
+ metadata = append(metadata, topicsCount...)
+ // Topics (string array)
+ for _, topic := range availableTopics {
+ topicLen := make([]byte, 2)
+ binary.BigEndian.PutUint16(topicLen, uint16(len(topic)))
+ metadata = append(metadata, topicLen...)
+ metadata = append(metadata, []byte(topic)...)
+ }
+ // UserData length (4 bytes) - empty
+ metadata = append(metadata, 0, 0, 0, 0)
+ member.Metadata = metadata
+ } else {
+ member.Metadata = request.GroupProtocols[0].Metadata
+ }
+ }
+
+ // Add member to group
+ group.Members[memberID] = member
+
+ // Register static member if applicable
+ if member.GroupInstanceID != nil && *member.GroupInstanceID != "" {
+ h.groupCoordinator.RegisterStaticMemberLocked(group, member)
+ }
+
+ // Update group's subscribed topics
+ h.updateGroupSubscription(group)
+
+ // Select assignment protocol using enhanced selection logic
+ // If the group already has a selected protocol, enforce compatibility with it.
+ existingProtocols := make([]string, 0, 1)
+ if group.Protocol != "" {
+ existingProtocols = append(existingProtocols, group.Protocol)
+ }
+
+ groupProtocol := SelectBestProtocol(request.GroupProtocols, existingProtocols)
+
+ // Ensure we have a valid protocol - fallback to "range" if empty
+ if groupProtocol == "" {
+ groupProtocol = "range"
+ }
+
+ // If a protocol is already selected for the group, reject joins that do not support it.
+ if len(existingProtocols) > 0 && (groupProtocol == "" || groupProtocol != group.Protocol) {
+ // Rollback member addition and static registration before returning error
+ delete(group.Members, memberID)
+ if member.GroupInstanceID != nil && *member.GroupInstanceID != "" {
+ h.groupCoordinator.UnregisterStaticMemberLocked(group, *member.GroupInstanceID)
+ }
+ // Recompute group subscription without the rejected member
+ h.updateGroupSubscription(group)
+ return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInconsistentGroupProtocol, apiVersion), nil
+ }
+
+ group.Protocol = groupProtocol
+
+ // Select group leader (first member or keep existing if still present)
+ if group.Leader == "" || group.Members[group.Leader] == nil {
+ group.Leader = memberID
+ } else {
+ }
+
+ // Build response - use the requested API version
+ response := JoinGroupResponse{
+ CorrelationID: correlationID,
+ ThrottleTimeMs: 0,
+ ErrorCode: ErrorCodeNone,
+ GenerationID: group.Generation,
+ ProtocolName: groupProtocol,
+ Leader: group.Leader,
+ MemberID: memberID,
+ Version: apiVersion,
+ }
+
+ // Debug logging for JoinGroup response
+
+ // If this member is the leader, include all member info for assignment
+ if memberID == group.Leader {
+ response.Members = make([]JoinGroupMember, 0, len(group.Members))
+ for mid, m := range group.Members {
+ instanceID := ""
+ if m.GroupInstanceID != nil {
+ instanceID = *m.GroupInstanceID
+ }
+ response.Members = append(response.Members, JoinGroupMember{
+ MemberID: mid,
+ GroupInstanceID: instanceID,
+ Metadata: m.Metadata,
+ })
+ }
+ }
+
+ resp := h.buildJoinGroupResponse(response)
+ return resp, nil
+}
+
+func (h *Handler) parseJoinGroupRequest(data []byte, apiVersion uint16) (*JoinGroupRequest, error) {
+ if len(data) < 8 {
+ return nil, fmt.Errorf("request too short")
+ }
+
+ offset := 0
+ isFlexible := IsFlexibleVersion(11, apiVersion)
+
+ // For flexible versions, skip top-level tagged fields first
+ if isFlexible {
+ // Skip top-level tagged fields (they come before the actual request fields)
+ _, consumed, err := DecodeTaggedFields(data[offset:])
+ if err != nil {
+ return nil, fmt.Errorf("JoinGroup v%d: decode top-level tagged fields: %w", apiVersion, err)
+ }
+ offset += consumed
+ }
+
+ // GroupID (string or compact string) - FIRST field in request
+ var groupID string
+ if isFlexible {
+ // Flexible protocol uses compact strings
+ endIdx := offset + 20 // Show more bytes for debugging
+ if endIdx > len(data) {
+ endIdx = len(data)
+ }
+ groupIDBytes, consumed := parseCompactString(data[offset:])
+ if consumed == 0 {
+ return nil, fmt.Errorf("invalid group ID compact string")
+ }
+ if groupIDBytes != nil {
+ groupID = string(groupIDBytes)
+ }
+ offset += consumed
+ } else {
+ // Non-flexible protocol uses regular strings
+ if offset+2 > len(data) {
+ return nil, fmt.Errorf("missing group ID length")
+ }
+ groupIDLength := int(binary.BigEndian.Uint16(data[offset:]))
+ offset += 2
+ if offset+groupIDLength > len(data) {
+ return nil, fmt.Errorf("invalid group ID length")
+ }
+ groupID = string(data[offset : offset+groupIDLength])
+ offset += groupIDLength
+ }
+
+ // Session timeout (4 bytes)
+ if offset+4 > len(data) {
+ return nil, fmt.Errorf("missing session timeout")
+ }
+ sessionTimeout := int32(binary.BigEndian.Uint32(data[offset:]))
+ offset += 4
+
+ // Rebalance timeout (4 bytes) - for v1+ versions
+ rebalanceTimeout := sessionTimeout // Default to session timeout for v0
+ if apiVersion >= 1 && offset+4 <= len(data) {
+ rebalanceTimeout = int32(binary.BigEndian.Uint32(data[offset:]))
+ offset += 4
+ }
+
+ // MemberID (string or compact string)
+ var memberID string
+ if isFlexible {
+ // Flexible protocol uses compact strings
+ memberIDBytes, consumed := parseCompactString(data[offset:])
+ if consumed == 0 {
+ return nil, fmt.Errorf("invalid member ID compact string")
+ }
+ if memberIDBytes != nil {
+ memberID = string(memberIDBytes)
+ }
+ offset += consumed
+ } else {
+ // Non-flexible protocol uses regular strings
+ if offset+2 > len(data) {
+ return nil, fmt.Errorf("missing member ID length")
+ }
+ memberIDLength := int(binary.BigEndian.Uint16(data[offset:]))
+ offset += 2
+ if memberIDLength > 0 {
+ if offset+memberIDLength > len(data) {
+ return nil, fmt.Errorf("invalid member ID length")
+ }
+ memberID = string(data[offset : offset+memberIDLength])
+ offset += memberIDLength
+ }
+ }
+
+ // Parse Group Instance ID (nullable string) - for JoinGroup v5+
+ var groupInstanceID string
+ if apiVersion >= 5 {
+ if isFlexible {
+ // FLEXIBLE V6+ FIX: GroupInstanceID is a compact nullable string
+ groupInstanceIDBytes, consumed := parseCompactString(data[offset:])
+ if consumed == 0 && len(data) > offset {
+ // Check if it's a null compact string (0x00)
+ if data[offset] == 0x00 {
+ groupInstanceID = "" // null
+ offset += 1
+ } else {
+ return nil, fmt.Errorf("JoinGroup v%d: invalid group instance ID compact string", apiVersion)
+ }
+ } else {
+ if groupInstanceIDBytes != nil {
+ groupInstanceID = string(groupInstanceIDBytes)
+ }
+ offset += consumed
+ }
+ } else {
+ // Non-flexible v5: regular nullable string
+ if offset+2 > len(data) {
+ return nil, fmt.Errorf("missing group instance ID length")
+ }
+ instanceIDLength := int16(binary.BigEndian.Uint16(data[offset:]))
+ offset += 2
+
+ if instanceIDLength == -1 {
+ groupInstanceID = "" // null string
+ } else if instanceIDLength >= 0 {
+ if offset+int(instanceIDLength) > len(data) {
+ return nil, fmt.Errorf("invalid group instance ID length")
+ }
+ groupInstanceID = string(data[offset : offset+int(instanceIDLength)])
+ offset += int(instanceIDLength)
+ }
+ }
+ }
+
+ // Parse Protocol Type
+ var protocolType string
+ if isFlexible {
+ // FLEXIBLE V6+ FIX: ProtocolType is a compact string, not regular string
+ endIdx := offset + 10
+ if endIdx > len(data) {
+ endIdx = len(data)
+ }
+ protocolTypeBytes, consumed := parseCompactString(data[offset:])
+ if consumed == 0 {
+ return nil, fmt.Errorf("JoinGroup v%d: invalid protocol type compact string", apiVersion)
+ }
+ if protocolTypeBytes != nil {
+ protocolType = string(protocolTypeBytes)
+ }
+ offset += consumed
+ } else {
+ // Non-flexible parsing (v0-v5)
+ if len(data) < offset+2 {
+ return nil, fmt.Errorf("JoinGroup request missing protocol type")
+ }
+ protocolTypeLength := binary.BigEndian.Uint16(data[offset : offset+2])
+ offset += 2
+
+ if len(data) < offset+int(protocolTypeLength) {
+ return nil, fmt.Errorf("JoinGroup request protocol type too short")
+ }
+ protocolType = string(data[offset : offset+int(protocolTypeLength)])
+ offset += int(protocolTypeLength)
+ }
+
+ // Parse Group Protocols array
+ var protocolsCount uint32
+ if isFlexible {
+ // FLEXIBLE V6+ FIX: GroupProtocols is a compact array, not regular array
+ compactLength, consumed, err := DecodeCompactArrayLength(data[offset:])
+ if err != nil {
+ return nil, fmt.Errorf("JoinGroup v%d: invalid group protocols compact array: %w", apiVersion, err)
+ }
+ protocolsCount = compactLength
+ offset += consumed
+ } else {
+ // Non-flexible parsing (v0-v5)
+ if len(data) < offset+4 {
+ return nil, fmt.Errorf("JoinGroup request missing group protocols")
+ }
+ protocolsCount = binary.BigEndian.Uint32(data[offset : offset+4])
+ offset += 4
+ }
+
+ protocols := make([]GroupProtocol, 0, protocolsCount)
+
+ for i := uint32(0); i < protocolsCount && offset < len(data); i++ {
+ // Parse protocol name
+ var protocolName string
+ if isFlexible {
+ // FLEXIBLE V6+ FIX: Protocol name is a compact string
+ endIdx := offset + 10
+ if endIdx > len(data) {
+ endIdx = len(data)
+ }
+ protocolNameBytes, consumed := parseCompactString(data[offset:])
+ if consumed == 0 {
+ return nil, fmt.Errorf("JoinGroup v%d: invalid protocol name compact string", apiVersion)
+ }
+ if protocolNameBytes != nil {
+ protocolName = string(protocolNameBytes)
+ }
+ offset += consumed
+ } else {
+ // Non-flexible parsing
+ if len(data) < offset+2 {
+ break
+ }
+ protocolNameLength := binary.BigEndian.Uint16(data[offset : offset+2])
+ offset += 2
+
+ if len(data) < offset+int(protocolNameLength) {
+ break
+ }
+ protocolName = string(data[offset : offset+int(protocolNameLength)])
+ offset += int(protocolNameLength)
+ }
+
+ // Parse protocol metadata
+ var metadata []byte
+ if isFlexible {
+ // FLEXIBLE V6+ FIX: Protocol metadata is compact bytes
+ metadataLength, consumed, err := DecodeCompactArrayLength(data[offset:])
+ if err != nil {
+ return nil, fmt.Errorf("JoinGroup v%d: invalid protocol metadata compact bytes: %w", apiVersion, err)
+ }
+ offset += consumed
+
+ if metadataLength > 0 && len(data) >= offset+int(metadataLength) {
+ metadata = make([]byte, metadataLength)
+ copy(metadata, data[offset:offset+int(metadataLength)])
+ offset += int(metadataLength)
+ }
+ } else {
+ // Non-flexible parsing
+ if len(data) < offset+4 {
+ break
+ }
+ metadataLength := binary.BigEndian.Uint32(data[offset : offset+4])
+ offset += 4
+
+ if metadataLength > 0 && len(data) >= offset+int(metadataLength) {
+ metadata = make([]byte, metadataLength)
+ copy(metadata, data[offset:offset+int(metadataLength)])
+ offset += int(metadataLength)
+ }
+ }
+
+ // Parse per-protocol tagged fields (v6+)
+ if isFlexible {
+ _, consumed, err := DecodeTaggedFields(data[offset:])
+ if err != nil {
+ // Don't fail - some clients might not send tagged fields
+ } else {
+ offset += consumed
+ }
+ }
+
+ protocols = append(protocols, GroupProtocol{
+ Name: protocolName,
+ Metadata: metadata,
+ })
+
+ }
+
+ // Parse request-level tagged fields (v6+)
+ if isFlexible {
+ if offset < len(data) {
+ _, _, err := DecodeTaggedFields(data[offset:])
+ if err != nil {
+ // Don't fail - some clients might not send tagged fields
+ }
+ }
+ }
+
+ return &JoinGroupRequest{
+ GroupID: groupID,
+ SessionTimeout: sessionTimeout,
+ RebalanceTimeout: rebalanceTimeout,
+ MemberID: memberID,
+ GroupInstanceID: groupInstanceID,
+ ProtocolType: protocolType,
+ GroupProtocols: protocols,
+ }, nil
+}
+
+func (h *Handler) buildJoinGroupResponse(response JoinGroupResponse) []byte {
+ // Debug logging for JoinGroup response
+
+ // Flexible response for v6+
+ if IsFlexibleVersion(11, response.Version) {
+ out := make([]byte, 0, 256)
+
+ // NOTE: Correlation ID and header-level tagged fields are handled by writeResponseWithHeader
+ // Do NOT include them in the response body
+
+ // throttle_time_ms (int32) - versions 2+
+ if response.Version >= 2 {
+ ttms := make([]byte, 4)
+ binary.BigEndian.PutUint32(ttms, uint32(response.ThrottleTimeMs))
+ out = append(out, ttms...)
+ }
+
+ // error_code (int16)
+ eb := make([]byte, 2)
+ binary.BigEndian.PutUint16(eb, uint16(response.ErrorCode))
+ out = append(out, eb...)
+
+ // generation_id (int32)
+ gb := make([]byte, 4)
+ binary.BigEndian.PutUint32(gb, uint32(response.GenerationID))
+ out = append(out, gb...)
+
+ // ProtocolType (v7+ nullable compact string) - NOT in v6!
+ if response.Version >= 7 {
+ pt := "consumer"
+ out = append(out, FlexibleNullableString(&pt)...)
+ }
+
+ // ProtocolName (compact string in v6, nullable compact string in v7+)
+ if response.Version >= 7 {
+ // nullable compact string in v7+
+ if response.ProtocolName == "" {
+ out = append(out, 0) // null
+ } else {
+ out = append(out, FlexibleString(response.ProtocolName)...)
+ }
+ } else {
+ // NON-nullable compact string in v6 - must not be empty!
+ if response.ProtocolName == "" {
+ response.ProtocolName = "range" // fallback to default
+ }
+ out = append(out, FlexibleString(response.ProtocolName)...)
+ }
+
+ // leader (compact string) - NOT nullable
+ if response.Leader == "" {
+ response.Leader = "unknown" // fallback for error cases
+ }
+ out = append(out, FlexibleString(response.Leader)...)
+
+ // SkipAssignment (bool) v9+
+ if response.Version >= 9 {
+ out = append(out, 0) // false
+ }
+
+ // member_id (compact string)
+ out = append(out, FlexibleString(response.MemberID)...)
+
+ // members (compact array)
+ // Compact arrays use length+1 encoding (0 = null, 1 = empty, n+1 = array of length n)
+ out = append(out, EncodeUvarint(uint32(len(response.Members)+1))...)
+ for _, m := range response.Members {
+ // member_id (compact string)
+ out = append(out, FlexibleString(m.MemberID)...)
+ // group_instance_id (compact nullable string)
+ if m.GroupInstanceID == "" {
+ out = append(out, 0)
+ } else {
+ out = append(out, FlexibleString(m.GroupInstanceID)...)
+ }
+ // metadata (compact bytes)
+ // Compact bytes use length+1 encoding (0 = null, 1 = empty, n+1 = bytes of length n)
+ out = append(out, EncodeUvarint(uint32(len(m.Metadata)+1))...)
+ out = append(out, m.Metadata...)
+ // member tagged fields (empty)
+ out = append(out, 0)
+ }
+
+ // top-level tagged fields (empty)
+ out = append(out, 0)
+
+ return out
+ }
+
+ // Legacy (non-flexible) response path
+ // Estimate response size
+ estimatedSize := 0
+ // CorrelationID(4) + (optional throttle 4) + error_code(2) + generation_id(4)
+ if response.Version >= 2 {
+ estimatedSize = 4 + 4 + 2 + 4
+ } else {
+ estimatedSize = 4 + 2 + 4
+ }
+ estimatedSize += 2 + len(response.ProtocolName) // protocol string
+ estimatedSize += 2 + len(response.Leader) // leader string
+ estimatedSize += 2 + len(response.MemberID) // member id string
+ estimatedSize += 4 // members array count
+ for _, member := range response.Members {
+ // MemberID string
+ estimatedSize += 2 + len(member.MemberID)
+ if response.Version >= 5 {
+ // GroupInstanceID string
+ estimatedSize += 2 + len(member.GroupInstanceID)
+ }
+ // Metadata bytes (4 + len)
+ estimatedSize += 4 + len(member.Metadata)
+ }
+
+ result := make([]byte, 0, estimatedSize)
+
+ // NOTE: Correlation ID is handled by writeResponseWithCorrelationID
+ // Do NOT include it in the response body
+
+ // JoinGroup v2 adds throttle_time_ms
+ if response.Version >= 2 {
+ throttleTimeBytes := make([]byte, 4)
+ binary.BigEndian.PutUint32(throttleTimeBytes, 0) // No throttling
+ result = append(result, throttleTimeBytes...)
+ }
+
+ // Error code (2 bytes)
+ errorCodeBytes := make([]byte, 2)
+ binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode))
+ result = append(result, errorCodeBytes...)
+
+ // Generation ID (4 bytes)
+ generationBytes := make([]byte, 4)
+ binary.BigEndian.PutUint32(generationBytes, uint32(response.GenerationID))
+ result = append(result, generationBytes...)
+
+ // Group protocol (string)
+ protocolLength := make([]byte, 2)
+ binary.BigEndian.PutUint16(protocolLength, uint16(len(response.ProtocolName)))
+ result = append(result, protocolLength...)
+ result = append(result, []byte(response.ProtocolName)...)
+
+ // Group leader (string)
+ leaderLength := make([]byte, 2)
+ binary.BigEndian.PutUint16(leaderLength, uint16(len(response.Leader)))
+ result = append(result, leaderLength...)
+ result = append(result, []byte(response.Leader)...)
+
+ // Member ID (string)
+ memberIDLength := make([]byte, 2)
+ binary.BigEndian.PutUint16(memberIDLength, uint16(len(response.MemberID)))
+ result = append(result, memberIDLength...)
+ result = append(result, []byte(response.MemberID)...)
+
+ // Members array (4 bytes count + members)
+ memberCountBytes := make([]byte, 4)
+ binary.BigEndian.PutUint32(memberCountBytes, uint32(len(response.Members)))
+ result = append(result, memberCountBytes...)
+
+ for _, member := range response.Members {
+ // Member ID (string)
+ memberLength := make([]byte, 2)
+ binary.BigEndian.PutUint16(memberLength, uint16(len(member.MemberID)))
+ result = append(result, memberLength...)
+ result = append(result, []byte(member.MemberID)...)
+
+ if response.Version >= 5 {
+ // Group instance ID (string) - can be empty
+ instanceIDLength := make([]byte, 2)
+ binary.BigEndian.PutUint16(instanceIDLength, uint16(len(member.GroupInstanceID)))
+ result = append(result, instanceIDLength...)
+ if len(member.GroupInstanceID) > 0 {
+ result = append(result, []byte(member.GroupInstanceID)...)
+ }
+ }
+
+ // Metadata (bytes)
+ metadataLength := make([]byte, 4)
+ binary.BigEndian.PutUint32(metadataLength, uint32(len(member.Metadata)))
+ result = append(result, metadataLength...)
+ result = append(result, member.Metadata...)
+ }
+
+ return result
+}
+
+func (h *Handler) buildJoinGroupErrorResponse(correlationID uint32, errorCode int16, apiVersion uint16) []byte {
+ response := JoinGroupResponse{
+ CorrelationID: correlationID,
+ ThrottleTimeMs: 0,
+ ErrorCode: errorCode,
+ GenerationID: -1,
+ ProtocolName: "range", // Use "range" as default protocol instead of empty string
+ Leader: "unknown", // Use "unknown" instead of empty string for non-nullable field
+ MemberID: "unknown", // Use "unknown" instead of empty string for non-nullable field
+ Version: apiVersion,
+ Members: []JoinGroupMember{},
+ }
+
+ return h.buildJoinGroupResponse(response)
+}
+
+// extractSubscriptionFromProtocolsEnhanced uses improved metadata parsing with better error handling
+func (h *Handler) extractSubscriptionFromProtocolsEnhanced(protocols []GroupProtocol) []string {
+ // Analyze protocol metadata for debugging
+ debugInfo := AnalyzeProtocolMetadata(protocols)
+ for _, info := range debugInfo {
+ if info.ParsedOK {
+ } else {
+ }
+ }
+
+ // Extract topics using enhanced parsing
+ topics := ExtractTopicsFromMetadata(protocols, h.getAvailableTopics())
+
+ return topics
+}
+
+func (h *Handler) updateGroupSubscription(group *consumer.ConsumerGroup) {
+ // Update group's subscribed topics from all members
+ group.SubscribedTopics = make(map[string]bool)
+ for _, member := range group.Members {
+ for _, topic := range member.Subscription {
+ group.SubscribedTopics[topic] = true
+ }
+ }
+}
+
+// SyncGroup API (key 14) - Consumer group coordination completion
+// Called by group members after JoinGroup to get partition assignments
+
+// SyncGroupRequest represents a SyncGroup request from a Kafka client
+type SyncGroupRequest struct {
+ GroupID string
+ GenerationID int32
+ MemberID string
+ GroupInstanceID string
+ GroupAssignments []GroupAssignment // Only from group leader
+}
+
+// GroupAssignment represents partition assignment for a group member
+type GroupAssignment struct {
+ MemberID string
+ Assignment []byte // Serialized assignment data
+}
+
+// SyncGroupResponse represents a SyncGroup response to a Kafka client
+type SyncGroupResponse struct {
+ CorrelationID uint32
+ ErrorCode int16
+ Assignment []byte // Serialized partition assignment for this member
+}
+
+// Additional error codes for SyncGroup
+// Error codes for SyncGroup are imported from errors.go
+
+func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
+
+ // Parse SyncGroup request
+ request, err := h.parseSyncGroupRequest(requestBody, apiVersion)
+ if err != nil {
+ return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil
+ }
+
+ // Validate request
+ if request.GroupID == "" || request.MemberID == "" {
+ return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil
+ }
+
+ // Get consumer group
+ group := h.groupCoordinator.GetGroup(request.GroupID)
+ if group == nil {
+ return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil
+ }
+
+ group.Mu.Lock()
+ defer group.Mu.Unlock()
+
+ // Update group's last activity
+ group.LastActivity = time.Now()
+
+ // Validate member exists
+ member, exists := group.Members[request.MemberID]
+ if !exists {
+ return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeUnknownMemberID, apiVersion), nil
+ }
+
+ // Validate generation
+ if request.GenerationID != group.Generation {
+ return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeIllegalGeneration, apiVersion), nil
+ }
+
+ // Check if this is the group leader with assignments
+ if request.MemberID == group.Leader && len(request.GroupAssignments) > 0 {
+ // Leader is providing assignments - process and store them
+ err = h.processGroupAssignments(group, request.GroupAssignments)
+ if err != nil {
+ return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInconsistentGroupProtocol, apiVersion), nil
+ }
+
+ // Move group to stable state
+ group.State = consumer.GroupStateStable
+
+ // Mark all members as stable
+ for _, m := range group.Members {
+ m.State = consumer.MemberStateStable
+ }
+ } else if group.State == consumer.GroupStateCompletingRebalance {
+ // Non-leader member waiting for assignments
+ // Assignments should already be processed by leader
+ } else {
+ // Trigger partition assignment using built-in strategy
+ topicPartitions := h.getTopicPartitions(group)
+ group.AssignPartitions(topicPartitions)
+
+ group.State = consumer.GroupStateStable
+ for _, m := range group.Members {
+ m.State = consumer.MemberStateStable
+ }
+ }
+
+ // Get assignment for this member
+ // SCHEMA REGISTRY COMPATIBILITY: Check if this is a Schema Registry client
+ var assignment []byte
+ if request.GroupID == "schema-registry" {
+ // Schema Registry expects JSON format assignment
+ assignment = h.serializeSchemaRegistryAssignment(group, member.Assignment)
+ } else {
+ // Standard Kafka binary assignment format
+ assignment = h.serializeMemberAssignment(member.Assignment)
+ }
+
+ // Build response
+ response := SyncGroupResponse{
+ CorrelationID: correlationID,
+ ErrorCode: ErrorCodeNone,
+ Assignment: assignment,
+ }
+
+ // Log assignment details for debugging
+ assignmentPreview := assignment
+ if len(assignmentPreview) > 100 {
+ assignmentPreview = assignment[:100]
+ }
+
+ resp := h.buildSyncGroupResponse(response, apiVersion)
+ return resp, nil
+}
+
+func (h *Handler) parseSyncGroupRequest(data []byte, apiVersion uint16) (*SyncGroupRequest, error) {
+ if len(data) < 8 {
+ return nil, fmt.Errorf("request too short")
+ }
+
+ offset := 0
+ isFlexible := IsFlexibleVersion(14, apiVersion) // SyncGroup API key = 14
+
+ // ADMINCLIENT COMPATIBILITY FIX: Parse top-level tagged fields at the beginning for flexible versions
+ if isFlexible {
+ _, consumed, err := DecodeTaggedFields(data[offset:])
+ if err == nil {
+ offset += consumed
+ } else {
+ }
+ }
+
+ // Parse GroupID
+ var groupID string
+ if isFlexible {
+ // FLEXIBLE V4+ FIX: GroupID is a compact string
+ groupIDBytes, consumed := parseCompactString(data[offset:])
+ if consumed == 0 {
+ return nil, fmt.Errorf("invalid group ID compact string")
+ }
+ if groupIDBytes != nil {
+ groupID = string(groupIDBytes)
+ }
+ offset += consumed
+ } else {
+ // Non-flexible parsing (v0-v3)
+ groupIDLength := int(binary.BigEndian.Uint16(data[offset:]))
+ offset += 2
+ if offset+groupIDLength > len(data) {
+ return nil, fmt.Errorf("invalid group ID length")
+ }
+ groupID = string(data[offset : offset+groupIDLength])
+ offset += groupIDLength
+ }
+
+ // Generation ID (4 bytes) - always fixed-length
+ if offset+4 > len(data) {
+ return nil, fmt.Errorf("missing generation ID")
+ }
+ generationID := int32(binary.BigEndian.Uint32(data[offset:]))
+ offset += 4
+
+ // Parse MemberID
+ var memberID string
+ if isFlexible {
+ // FLEXIBLE V4+ FIX: MemberID is a compact string
+ memberIDBytes, consumed := parseCompactString(data[offset:])
+ if consumed == 0 {
+ return nil, fmt.Errorf("invalid member ID compact string")
+ }
+ if memberIDBytes != nil {
+ memberID = string(memberIDBytes)
+ }
+ offset += consumed
+ } else {
+ // Non-flexible parsing (v0-v3)
+ if offset+2 > len(data) {
+ return nil, fmt.Errorf("missing member ID length")
+ }
+ memberIDLength := int(binary.BigEndian.Uint16(data[offset:]))
+ offset += 2
+ if offset+memberIDLength > len(data) {
+ return nil, fmt.Errorf("invalid member ID length")
+ }
+ memberID = string(data[offset : offset+memberIDLength])
+ offset += memberIDLength
+ }
+
+ // Parse GroupInstanceID (nullable string) - for SyncGroup v3+
+ var groupInstanceID string
+ if apiVersion >= 3 {
+ if isFlexible {
+ // FLEXIBLE V4+ FIX: GroupInstanceID is a compact nullable string
+ groupInstanceIDBytes, consumed := parseCompactString(data[offset:])
+ if consumed == 0 && len(data) > offset && data[offset] == 0x00 {
+ groupInstanceID = "" // null
+ offset += 1
+ } else {
+ if groupInstanceIDBytes != nil {
+ groupInstanceID = string(groupInstanceIDBytes)
+ }
+ offset += consumed
+ }
+ } else {
+ // Non-flexible v3: regular nullable string
+ if offset+2 > len(data) {
+ return nil, fmt.Errorf("missing group instance ID length")
+ }
+ instanceIDLength := int16(binary.BigEndian.Uint16(data[offset:]))
+ offset += 2
+
+ if instanceIDLength == -1 {
+ groupInstanceID = "" // null string
+ } else if instanceIDLength >= 0 {
+ if offset+int(instanceIDLength) > len(data) {
+ return nil, fmt.Errorf("invalid group instance ID length")
+ }
+ groupInstanceID = string(data[offset : offset+int(instanceIDLength)])
+ offset += int(instanceIDLength)
+ }
+ }
+ }
+
+ // Parse assignments array if present (leader sends assignments)
+ assignments := make([]GroupAssignment, 0)
+
+ if offset < len(data) {
+ var assignmentsCount uint32
+ if isFlexible {
+ // FLEXIBLE V4+ FIX: Assignments is a compact array
+ compactLength, consumed, err := DecodeCompactArrayLength(data[offset:])
+ if err != nil {
+ } else {
+ assignmentsCount = compactLength
+ offset += consumed
+ }
+ } else {
+ // Non-flexible: regular array with 4-byte length
+ if offset+4 <= len(data) {
+ assignmentsCount = binary.BigEndian.Uint32(data[offset:])
+ offset += 4
+ }
+ }
+
+ // Basic sanity check to avoid very large allocations
+ if assignmentsCount > 0 && assignmentsCount < 10000 {
+ for i := uint32(0); i < assignmentsCount && offset < len(data); i++ {
+ var mID string
+ var assign []byte
+
+ // Parse member_id
+ if isFlexible {
+ // FLEXIBLE V4+ FIX: member_id is a compact string
+ memberIDBytes, consumed := parseCompactString(data[offset:])
+ if consumed == 0 {
+ break
+ }
+ if memberIDBytes != nil {
+ mID = string(memberIDBytes)
+ }
+ offset += consumed
+ } else {
+ // Non-flexible: regular string
+ if offset+2 > len(data) {
+ break
+ }
+ memberLen := int(binary.BigEndian.Uint16(data[offset:]))
+ offset += 2
+ if memberLen < 0 || offset+memberLen > len(data) {
+ break
+ }
+ mID = string(data[offset : offset+memberLen])
+ offset += memberLen
+ }
+
+ // Parse assignment (bytes)
+ if isFlexible {
+ // FLEXIBLE V4+ FIX: assignment is compact bytes
+ assignLength, consumed, err := DecodeCompactArrayLength(data[offset:])
+ if err != nil {
+ break
+ }
+ offset += consumed
+ if assignLength > 0 && offset+int(assignLength) <= len(data) {
+ assign = make([]byte, assignLength)
+ copy(assign, data[offset:offset+int(assignLength)])
+ offset += int(assignLength)
+ }
+
+ // CRITICAL FIX: Flexible format requires tagged fields after each assignment struct
+ if offset < len(data) {
+ _, taggedConsumed, tagErr := DecodeTaggedFields(data[offset:])
+ if tagErr == nil {
+ offset += taggedConsumed
+ }
+ }
+ } else {
+ // Non-flexible: regular bytes
+ if offset+4 > len(data) {
+ break
+ }
+ assignLen := int(binary.BigEndian.Uint32(data[offset:]))
+ offset += 4
+ if assignLen < 0 || offset+assignLen > len(data) {
+ break
+ }
+ if assignLen > 0 {
+ assign = make([]byte, assignLen)
+ copy(assign, data[offset:offset+assignLen])
+ }
+ offset += assignLen
+ }
+
+ assignments = append(assignments, GroupAssignment{MemberID: mID, Assignment: assign})
+ }
+ }
+ }
+
+ // Parse request-level tagged fields (v4+)
+ if isFlexible {
+ if offset < len(data) {
+ _, consumed, err := DecodeTaggedFields(data[offset:])
+ if err != nil {
+ } else {
+ offset += consumed
+ }
+ }
+ }
+
+ return &SyncGroupRequest{
+ GroupID: groupID,
+ GenerationID: generationID,
+ MemberID: memberID,
+ GroupInstanceID: groupInstanceID,
+ GroupAssignments: assignments,
+ }, nil
+}
+
+func (h *Handler) buildSyncGroupResponse(response SyncGroupResponse, apiVersion uint16) []byte {
+ estimatedSize := 16 + len(response.Assignment)
+ result := make([]byte, 0, estimatedSize)
+
+ // NOTE: Correlation ID and header-level tagged fields are handled by writeResponseWithHeader
+ // Do NOT include them in the response body
+
+ // SyncGroup v1+ has throttle_time_ms at the beginning
+ // SyncGroup v0 does NOT include throttle_time_ms
+ if apiVersion >= 1 {
+ // Throttle time (4 bytes, 0 = no throttling)
+ result = append(result, 0, 0, 0, 0)
+ }
+
+ // Error code (2 bytes)
+ errorCodeBytes := make([]byte, 2)
+ binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode))
+ result = append(result, errorCodeBytes...)
+
+ // SyncGroup v5 adds protocol_type and protocol_name (compact nullable strings)
+ if apiVersion >= 5 {
+ // protocol_type = null (varint 0)
+ result = append(result, 0x00)
+ // protocol_name = null (varint 0)
+ result = append(result, 0x00)
+ }
+
+ // Assignment - FLEXIBLE V4+ FIX
+ if IsFlexibleVersion(14, apiVersion) {
+ // FLEXIBLE FORMAT: Assignment as compact bytes
+ // CRITICAL FIX: Use CompactStringLength for compact bytes (not CompactArrayLength)
+ // Compact bytes use the same encoding as compact strings: 0 = null, 1 = empty, n+1 = length n
+ assignmentLen := len(response.Assignment)
+ if assignmentLen == 0 {
+ // Empty compact bytes = length 0, encoded as 0x01 (0 + 1)
+ result = append(result, 0x01) // Empty compact bytes
+ } else {
+ // Non-empty assignment: encode length + data
+ // Use CompactStringLength which correctly encodes as length+1
+ compactLength := CompactStringLength(assignmentLen)
+ result = append(result, compactLength...)
+ result = append(result, response.Assignment...)
+ }
+ // Add response-level tagged fields for flexible format
+ result = append(result, 0x00) // Empty tagged fields (varint: 0)
+ } else {
+ // NON-FLEXIBLE FORMAT: Assignment as regular bytes
+ assignmentLength := make([]byte, 4)
+ binary.BigEndian.PutUint32(assignmentLength, uint32(len(response.Assignment)))
+ result = append(result, assignmentLength...)
+ result = append(result, response.Assignment...)
+ }
+
+ return result
+}
+
+func (h *Handler) buildSyncGroupErrorResponse(correlationID uint32, errorCode int16, apiVersion uint16) []byte {
+ response := SyncGroupResponse{
+ CorrelationID: correlationID,
+ ErrorCode: errorCode,
+ Assignment: []byte{},
+ }
+
+ return h.buildSyncGroupResponse(response, apiVersion)
+}
+
+func (h *Handler) processGroupAssignments(group *consumer.ConsumerGroup, assignments []GroupAssignment) error {
+ // Apply leader-provided assignments
+ // Clear current assignments
+ for _, m := range group.Members {
+ m.Assignment = nil
+ }
+
+ for _, ga := range assignments {
+ m, ok := group.Members[ga.MemberID]
+ if !ok {
+ // Skip unknown members
+ continue
+ }
+
+ parsed, err := h.parseMemberAssignment(ga.Assignment)
+ if err != nil {
+ return err
+ }
+ m.Assignment = parsed
+ }
+
+ return nil
+}
+
+// parseMemberAssignment decodes ConsumerGroupMemberAssignment bytes into assignments
+func (h *Handler) parseMemberAssignment(data []byte) ([]consumer.PartitionAssignment, error) {
+ if len(data) < 2+4 {
+ // Empty or missing; treat as no assignment
+ return []consumer.PartitionAssignment{}, nil
+ }
+
+ offset := 0
+
+ // Version (2 bytes)
+ if offset+2 > len(data) {
+ return nil, fmt.Errorf("assignment too short for version")
+ }
+ _ = int16(binary.BigEndian.Uint16(data[offset : offset+2]))
+ offset += 2
+
+ // Number of topics (4 bytes)
+ if offset+4 > len(data) {
+ return nil, fmt.Errorf("assignment too short for topics count")
+ }
+ topicsCount := int(binary.BigEndian.Uint32(data[offset:]))
+ offset += 4
+
+ if topicsCount < 0 || topicsCount > 100000 {
+ return nil, fmt.Errorf("unreasonable topics count in assignment: %d", topicsCount)
+ }
+
+ result := make([]consumer.PartitionAssignment, 0)
+
+ for i := 0; i < topicsCount && offset < len(data); i++ {
+ // topic string
+ if offset+2 > len(data) {
+ return nil, fmt.Errorf("assignment truncated reading topic len")
+ }
+ tlen := int(binary.BigEndian.Uint16(data[offset:]))
+ offset += 2
+ if tlen < 0 || offset+tlen > len(data) {
+ return nil, fmt.Errorf("assignment truncated reading topic name")
+ }
+ topic := string(data[offset : offset+tlen])
+ offset += tlen
+
+ // partitions array length
+ if offset+4 > len(data) {
+ return nil, fmt.Errorf("assignment truncated reading partitions len")
+ }
+ numPartitions := int(binary.BigEndian.Uint32(data[offset:]))
+ offset += 4
+ if numPartitions < 0 || numPartitions > 1000000 {
+ return nil, fmt.Errorf("unreasonable partitions count: %d", numPartitions)
+ }
+
+ for p := 0; p < numPartitions; p++ {
+ if offset+4 > len(data) {
+ return nil, fmt.Errorf("assignment truncated reading partition id")
+ }
+ pid := int32(binary.BigEndian.Uint32(data[offset:]))
+ offset += 4
+ result = append(result, consumer.PartitionAssignment{Topic: topic, Partition: pid})
+ }
+ }
+
+ // Optional UserData: bytes length + data. Safe to ignore.
+ // If present but truncated, ignore silently.
+
+ return result, nil
+}
+
+func (h *Handler) getTopicPartitions(group *consumer.ConsumerGroup) map[string][]int32 {
+ topicPartitions := make(map[string][]int32)
+
+ // Get partition info for all subscribed topics
+ for topic := range group.SubscribedTopics {
+ // Check if topic exists using SeaweedMQ handler
+ if h.seaweedMQHandler.TopicExists(topic) {
+ // For now, assume 1 partition per topic (can be extended later)
+ // In a real implementation, this would query SeaweedMQ for actual partition count
+ partitions := []int32{0}
+ topicPartitions[topic] = partitions
+ } else {
+ // Default to single partition if topic not found
+ topicPartitions[topic] = []int32{0}
+ }
+ }
+
+ return topicPartitions
+}
+
+func (h *Handler) serializeSchemaRegistryAssignment(group *consumer.ConsumerGroup, assignments []consumer.PartitionAssignment) []byte {
+ // Schema Registry expects a JSON assignment in the format:
+ // {"error":0,"master":"member-id","master_identity":{"host":"localhost","port":8081,"master_eligibility":true,"scheme":"http","version":"7.4.0-ce"}}
+
+ // CRITICAL FIX: Extract the actual leader's identity from the leader's metadata
+ // to avoid localhost/hostname mismatch that causes Schema Registry to forward
+ // requests to itself
+ leaderMember, exists := group.Members[group.Leader]
+ if !exists {
+ // Fallback if leader not found (shouldn't happen)
+ jsonAssignment := `{"error":0,"master":"","master_identity":{"host":"localhost","port":8081,"master_eligibility":true,"scheme":"http","version":1}}`
+ return []byte(jsonAssignment)
+ }
+
+ // Parse the leader's metadata to extract the Schema Registry identity
+ // The metadata is the serialized SchemaRegistryIdentity JSON
+ var identity map[string]interface{}
+ err := json.Unmarshal(leaderMember.Metadata, &identity)
+ if err != nil {
+ // Fallback to basic assignment
+ jsonAssignment := fmt.Sprintf(`{"error":0,"master":"%s","master_identity":{"host":"localhost","port":8081,"master_eligibility":true,"scheme":"http","version":1}}`, group.Leader)
+ return []byte(jsonAssignment)
+ }
+
+ // Extract fields with defaults
+ host := "localhost"
+ port := 8081
+ scheme := "http"
+ version := 1
+ leaderEligibility := true
+
+ if h, ok := identity["host"].(string); ok {
+ host = h
+ }
+ if p, ok := identity["port"].(float64); ok {
+ port = int(p)
+ }
+ if s, ok := identity["scheme"].(string); ok {
+ scheme = s
+ }
+ if v, ok := identity["version"].(float64); ok {
+ version = int(v)
+ }
+ if le, ok := identity["master_eligibility"].(bool); ok {
+ leaderEligibility = le
+ }
+
+ // Build the assignment JSON with the actual leader identity
+ jsonAssignment := fmt.Sprintf(`{"error":0,"master":"%s","master_identity":{"host":"%s","port":%d,"master_eligibility":%t,"scheme":"%s","version":%d}}`,
+ group.Leader, host, port, leaderEligibility, scheme, version)
+
+ return []byte(jsonAssignment)
+}
+
+func (h *Handler) serializeMemberAssignment(assignments []consumer.PartitionAssignment) []byte {
+ // Build ConsumerGroupMemberAssignment format exactly as Sarama expects:
+ // Version(2) + Topics array + UserData bytes
+
+ // Group assignments by topic
+ topicAssignments := make(map[string][]int32)
+ for _, assignment := range assignments {
+ topicAssignments[assignment.Topic] = append(topicAssignments[assignment.Topic], assignment.Partition)
+ }
+
+ result := make([]byte, 0, 64)
+
+ // Version (2 bytes) - use version 1
+ result = append(result, 0, 1)
+
+ // Number of topics (4 bytes) - array length
+ numTopicsBytes := make([]byte, 4)
+ binary.BigEndian.PutUint32(numTopicsBytes, uint32(len(topicAssignments)))
+ result = append(result, numTopicsBytes...)
+
+ // Get sorted topic names to ensure deterministic order
+ topics := make([]string, 0, len(topicAssignments))
+ for topic := range topicAssignments {
+ topics = append(topics, topic)
+ }
+ sort.Strings(topics)
+
+ // Topics - each topic follows Kafka string + int32 array format
+ for _, topic := range topics {
+ partitions := topicAssignments[topic]
+ // Topic name as Kafka string: length(2) + content
+ topicLenBytes := make([]byte, 2)
+ binary.BigEndian.PutUint16(topicLenBytes, uint16(len(topic)))
+ result = append(result, topicLenBytes...)
+ result = append(result, []byte(topic)...)
+
+ // Partitions as int32 array: length(4) + elements
+ numPartitionsBytes := make([]byte, 4)
+ binary.BigEndian.PutUint32(numPartitionsBytes, uint32(len(partitions)))
+ result = append(result, numPartitionsBytes...)
+
+ // Partitions (4 bytes each)
+ for _, partition := range partitions {
+ partitionBytes := make([]byte, 4)
+ binary.BigEndian.PutUint32(partitionBytes, uint32(partition))
+ result = append(result, partitionBytes...)
+ }
+ }
+
+ // UserData as Kafka bytes: length(4) + data (empty in our case)
+ // For empty user data, just put length = 0
+ result = append(result, 0, 0, 0, 0)
+
+ return result
+}
+
+// getAvailableTopics returns list of available topics for subscription metadata
+func (h *Handler) getAvailableTopics() []string {
+ return h.seaweedMQHandler.ListTopics()
+}