aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/protocol/consumer_group_metadata.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/protocol/consumer_group_metadata.go')
-rw-r--r--weed/mq/kafka/protocol/consumer_group_metadata.go332
1 files changed, 332 insertions, 0 deletions
diff --git a/weed/mq/kafka/protocol/consumer_group_metadata.go b/weed/mq/kafka/protocol/consumer_group_metadata.go
new file mode 100644
index 000000000..f0c20a312
--- /dev/null
+++ b/weed/mq/kafka/protocol/consumer_group_metadata.go
@@ -0,0 +1,332 @@
+package protocol
+
+import (
+ "encoding/binary"
+ "fmt"
+ "net"
+ "strings"
+ "sync"
+)
+
+// ConsumerProtocolMetadata represents parsed consumer protocol metadata
+type ConsumerProtocolMetadata struct {
+ Version int16 // Protocol metadata version
+ Topics []string // Subscribed topic names
+ UserData []byte // Optional user data
+ AssignmentStrategy string // Preferred assignment strategy
+}
+
+// ConnectionContext holds connection-specific information for requests
+type ConnectionContext struct {
+ RemoteAddr net.Addr // Client's remote address
+ LocalAddr net.Addr // Server's local address
+ ConnectionID string // Connection identifier
+ ClientID string // Kafka client ID from request headers
+ ConsumerGroup string // Consumer group (set by JoinGroup)
+ MemberID string // Consumer group member ID (set by JoinGroup)
+ // Per-connection broker client for isolated gRPC streams
+ // CRITICAL: Each Kafka connection MUST have its own gRPC streams to avoid interference
+ // when multiple consumers or requests are active on different connections
+ BrokerClient interface{} // Will be set to *integration.BrokerClient
+
+ // Persistent partition readers - one goroutine per topic-partition that maintains position
+ // and streams forward, eliminating repeated offset lookups and reducing broker CPU load
+ partitionReaders sync.Map // map[TopicPartitionKey]*partitionReader
+}
+
+// ExtractClientHost extracts the client hostname/IP from connection context
+func ExtractClientHost(connCtx *ConnectionContext) string {
+ if connCtx == nil || connCtx.RemoteAddr == nil {
+ return "unknown"
+ }
+
+ // Extract host portion from address
+ if tcpAddr, ok := connCtx.RemoteAddr.(*net.TCPAddr); ok {
+ return tcpAddr.IP.String()
+ }
+
+ // Fallback: parse string representation
+ addrStr := connCtx.RemoteAddr.String()
+ if host, _, err := net.SplitHostPort(addrStr); err == nil {
+ return host
+ }
+
+ // Last resort: return full address
+ return addrStr
+}
+
+// ParseConsumerProtocolMetadata parses consumer protocol metadata with enhanced error handling
+func ParseConsumerProtocolMetadata(metadata []byte, strategyName string) (*ConsumerProtocolMetadata, error) {
+ if len(metadata) < 2 {
+ return &ConsumerProtocolMetadata{
+ Version: 0,
+ Topics: []string{},
+ UserData: []byte{},
+ AssignmentStrategy: strategyName,
+ }, nil
+ }
+
+ result := &ConsumerProtocolMetadata{
+ AssignmentStrategy: strategyName,
+ }
+
+ offset := 0
+
+ // Parse version (2 bytes)
+ if len(metadata) < offset+2 {
+ return nil, fmt.Errorf("metadata too short for version field")
+ }
+ result.Version = int16(binary.BigEndian.Uint16(metadata[offset : offset+2]))
+ offset += 2
+
+ // Parse topics array
+ if len(metadata) < offset+4 {
+ return nil, fmt.Errorf("metadata too short for topics count")
+ }
+ topicsCount := binary.BigEndian.Uint32(metadata[offset : offset+4])
+ offset += 4
+
+ // Validate topics count (reasonable limit)
+ if topicsCount > 10000 {
+ return nil, fmt.Errorf("unreasonable topics count: %d", topicsCount)
+ }
+
+ result.Topics = make([]string, 0, topicsCount)
+
+ for i := uint32(0); i < topicsCount && offset < len(metadata); i++ {
+ // Parse topic name length
+ if len(metadata) < offset+2 {
+ return nil, fmt.Errorf("metadata too short for topic %d name length", i)
+ }
+ topicNameLength := binary.BigEndian.Uint16(metadata[offset : offset+2])
+ offset += 2
+
+ // Validate topic name length
+ if topicNameLength > 1000 {
+ return nil, fmt.Errorf("unreasonable topic name length: %d", topicNameLength)
+ }
+
+ if len(metadata) < offset+int(topicNameLength) {
+ return nil, fmt.Errorf("metadata too short for topic %d name data", i)
+ }
+
+ topicName := string(metadata[offset : offset+int(topicNameLength)])
+ offset += int(topicNameLength)
+
+ // Validate topic name (basic validation)
+ if len(topicName) == 0 {
+ continue // Skip empty topic names
+ }
+
+ result.Topics = append(result.Topics, topicName)
+ }
+
+ // Parse user data if remaining bytes exist
+ if len(metadata) >= offset+4 {
+ userDataLength := binary.BigEndian.Uint32(metadata[offset : offset+4])
+ offset += 4
+
+ // Handle -1 (0xFFFFFFFF) as null/empty user data (Kafka protocol convention)
+ if userDataLength == 0xFFFFFFFF {
+ result.UserData = []byte{}
+ return result, nil
+ }
+
+ // Validate user data length
+ if userDataLength > 100000 { // 100KB limit
+ return nil, fmt.Errorf("unreasonable user data length: %d", userDataLength)
+ }
+
+ if len(metadata) >= offset+int(userDataLength) {
+ result.UserData = make([]byte, userDataLength)
+ copy(result.UserData, metadata[offset:offset+int(userDataLength)])
+ }
+ }
+
+ return result, nil
+}
+
+// GenerateConsumerProtocolMetadata creates protocol metadata for a consumer subscription
+func GenerateConsumerProtocolMetadata(topics []string, userData []byte) []byte {
+ // Calculate total size needed
+ size := 2 + 4 + 4 // version + topics_count + user_data_length
+ for _, topic := range topics {
+ size += 2 + len(topic) // topic_name_length + topic_name
+ }
+ size += len(userData)
+
+ metadata := make([]byte, 0, size)
+
+ // Version (2 bytes) - use version 1
+ metadata = append(metadata, 0, 1)
+
+ // Topics count (4 bytes)
+ topicsCount := make([]byte, 4)
+ binary.BigEndian.PutUint32(topicsCount, uint32(len(topics)))
+ metadata = append(metadata, topicsCount...)
+
+ // Topics (string array)
+ for _, topic := range topics {
+ topicLen := make([]byte, 2)
+ binary.BigEndian.PutUint16(topicLen, uint16(len(topic)))
+ metadata = append(metadata, topicLen...)
+ metadata = append(metadata, []byte(topic)...)
+ }
+
+ // UserData length and data (4 bytes + data)
+ userDataLen := make([]byte, 4)
+ binary.BigEndian.PutUint32(userDataLen, uint32(len(userData)))
+ metadata = append(metadata, userDataLen...)
+ metadata = append(metadata, userData...)
+
+ return metadata
+}
+
+// ValidateAssignmentStrategy checks if an assignment strategy is supported
+func ValidateAssignmentStrategy(strategy string) bool {
+ supportedStrategies := map[string]bool{
+ "range": true,
+ "roundrobin": true,
+ "sticky": true,
+ "cooperative-sticky": false, // Not yet implemented
+ }
+
+ return supportedStrategies[strategy]
+}
+
+// ExtractTopicsFromMetadata extracts topic list from protocol metadata with fallback
+func ExtractTopicsFromMetadata(protocols []GroupProtocol, fallbackTopics []string) []string {
+ for _, protocol := range protocols {
+ if ValidateAssignmentStrategy(protocol.Name) {
+ parsed, err := ParseConsumerProtocolMetadata(protocol.Metadata, protocol.Name)
+ if err != nil {
+ continue
+ }
+
+ if len(parsed.Topics) > 0 {
+ return parsed.Topics
+ }
+ }
+ }
+
+ // Fallback to provided topics or default
+ if len(fallbackTopics) > 0 {
+ return fallbackTopics
+ }
+
+ return []string{"test-topic"}
+}
+
+// SelectBestProtocol chooses the best assignment protocol from available options
+func SelectBestProtocol(protocols []GroupProtocol, groupProtocols []string) string {
+ // Priority order: sticky > roundrobin > range
+ protocolPriority := []string{"sticky", "roundrobin", "range"}
+
+ // Find supported protocols in client's list
+ clientProtocols := make(map[string]bool)
+ for _, protocol := range protocols {
+ if ValidateAssignmentStrategy(protocol.Name) {
+ clientProtocols[protocol.Name] = true
+ }
+ }
+
+ // Find supported protocols in group's list
+ groupProtocolSet := make(map[string]bool)
+ for _, protocol := range groupProtocols {
+ groupProtocolSet[protocol] = true
+ }
+
+ // Select highest priority protocol that both client and group support
+ for _, preferred := range protocolPriority {
+ if clientProtocols[preferred] && (len(groupProtocols) == 0 || groupProtocolSet[preferred]) {
+ return preferred
+ }
+ }
+
+ // If group has existing protocols, find a protocol supported by both client and group
+ if len(groupProtocols) > 0 {
+ // Try to find a protocol that both client and group support
+ for _, preferred := range protocolPriority {
+ if clientProtocols[preferred] && groupProtocolSet[preferred] {
+ return preferred
+ }
+ }
+
+ // No common protocol found - handle special fallback case
+ // If client supports nothing we validate, but group supports "range", use "range"
+ if len(clientProtocols) == 0 && groupProtocolSet["range"] {
+ return "range"
+ }
+
+ // Return empty string to indicate no compatible protocol found
+ return ""
+ }
+
+ // Fallback to first supported protocol from client (only when group has no existing protocols)
+ for _, protocol := range protocols {
+ if ValidateAssignmentStrategy(protocol.Name) {
+ return protocol.Name
+ }
+ }
+
+ // Last resort
+ return "range"
+}
+
+// SanitizeConsumerGroupID validates and sanitizes consumer group ID
+func SanitizeConsumerGroupID(groupID string) (string, error) {
+ if len(groupID) == 0 {
+ return "", fmt.Errorf("empty group ID")
+ }
+
+ if len(groupID) > 255 {
+ return "", fmt.Errorf("group ID too long: %d characters (max 255)", len(groupID))
+ }
+
+ // Basic validation: no control characters
+ for _, char := range groupID {
+ if char < 32 || char == 127 {
+ return "", fmt.Errorf("group ID contains invalid characters")
+ }
+ }
+
+ return strings.TrimSpace(groupID), nil
+}
+
+// ProtocolMetadataDebugInfo returns debug information about protocol metadata
+type ProtocolMetadataDebugInfo struct {
+ Strategy string
+ Version int16
+ TopicCount int
+ Topics []string
+ UserDataSize int
+ ParsedOK bool
+ ParseError string
+}
+
+// AnalyzeProtocolMetadata provides detailed debug information about protocol metadata
+func AnalyzeProtocolMetadata(protocols []GroupProtocol) []ProtocolMetadataDebugInfo {
+ result := make([]ProtocolMetadataDebugInfo, 0, len(protocols))
+
+ for _, protocol := range protocols {
+ info := ProtocolMetadataDebugInfo{
+ Strategy: protocol.Name,
+ }
+
+ parsed, err := ParseConsumerProtocolMetadata(protocol.Metadata, protocol.Name)
+ if err != nil {
+ info.ParsedOK = false
+ info.ParseError = err.Error()
+ } else {
+ info.ParsedOK = true
+ info.Version = parsed.Version
+ info.TopicCount = len(parsed.Topics)
+ info.Topics = parsed.Topics
+ info.UserDataSize = len(parsed.UserData)
+ }
+
+ result = append(result, info)
+ }
+
+ return result
+}