1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
|
package protocol
import (
"encoding/binary"
"fmt"
"net"
"sync"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
)
// ConsumerProtocolMetadata represents parsed consumer protocol metadata
type ConsumerProtocolMetadata struct {
Version int16 // Protocol metadata version
Topics []string // Subscribed topic names
UserData []byte // Optional user data
AssignmentStrategy string // Preferred assignment strategy
}
// ConnectionContext holds connection-specific information for requests
type ConnectionContext struct {
RemoteAddr net.Addr // Client's remote address
LocalAddr net.Addr // Server's local address
ConnectionID string // Connection identifier
ClientID string // Kafka client ID from request headers
ConsumerGroup string // Consumer group (set by JoinGroup)
MemberID string // Consumer group member ID (set by JoinGroup)
// Per-connection broker client for isolated gRPC streams
// Each Kafka connection MUST have its own gRPC streams to avoid interference
// when multiple consumers or requests are active on different connections
BrokerClient interface{} // Will be set to *integration.BrokerClient
// Persistent partition readers - one goroutine per topic-partition that maintains position
// and streams forward, eliminating repeated offset lookups and reducing broker CPU load
partitionReaders sync.Map // map[TopicPartitionKey]*partitionReader
}
// ExtractClientHost extracts the client hostname/IP from connection context
func ExtractClientHost(connCtx *ConnectionContext) string {
if connCtx == nil || connCtx.RemoteAddr == nil {
return "unknown"
}
// Extract host portion from address
if tcpAddr, ok := connCtx.RemoteAddr.(*net.TCPAddr); ok {
return tcpAddr.IP.String()
}
// Fallback: parse string representation
addrStr := connCtx.RemoteAddr.String()
if host, _, err := net.SplitHostPort(addrStr); err == nil {
return host
}
// Last resort: return full address
return addrStr
}
// ParseConsumerProtocolMetadata parses consumer protocol metadata with enhanced error handling
func ParseConsumerProtocolMetadata(metadata []byte, strategyName string) (*ConsumerProtocolMetadata, error) {
if len(metadata) < 2 {
return &ConsumerProtocolMetadata{
Version: 0,
Topics: []string{},
UserData: []byte{},
AssignmentStrategy: strategyName,
}, nil
}
result := &ConsumerProtocolMetadata{
AssignmentStrategy: strategyName,
}
offset := 0
// Parse version (2 bytes)
if len(metadata) < offset+2 {
return nil, fmt.Errorf("metadata too short for version field")
}
result.Version = int16(binary.BigEndian.Uint16(metadata[offset : offset+2]))
offset += 2
// Parse topics array
if len(metadata) < offset+4 {
return nil, fmt.Errorf("metadata too short for topics count")
}
topicsCount := binary.BigEndian.Uint32(metadata[offset : offset+4])
offset += 4
// Validate topics count (reasonable limit)
if topicsCount > 10000 {
return nil, fmt.Errorf("unreasonable topics count: %d", topicsCount)
}
result.Topics = make([]string, 0, topicsCount)
for i := uint32(0); i < topicsCount && offset < len(metadata); i++ {
// Parse topic name length
if len(metadata) < offset+2 {
return nil, fmt.Errorf("metadata too short for topic %d name length", i)
}
topicNameLength := binary.BigEndian.Uint16(metadata[offset : offset+2])
offset += 2
// Validate topic name length
if topicNameLength > 1000 {
return nil, fmt.Errorf("unreasonable topic name length: %d", topicNameLength)
}
if len(metadata) < offset+int(topicNameLength) {
return nil, fmt.Errorf("metadata too short for topic %d name data", i)
}
topicName := string(metadata[offset : offset+int(topicNameLength)])
offset += int(topicNameLength)
// Validate topic name (basic validation)
if len(topicName) == 0 {
continue // Skip empty topic names
}
result.Topics = append(result.Topics, topicName)
}
// Parse user data if remaining bytes exist
if len(metadata) >= offset+4 {
userDataLength := binary.BigEndian.Uint32(metadata[offset : offset+4])
offset += 4
// Handle -1 (0xFFFFFFFF) as null/empty user data (Kafka protocol convention)
if userDataLength == 0xFFFFFFFF {
result.UserData = []byte{}
return result, nil
}
// Validate user data length
if userDataLength > 100000 { // 100KB limit
return nil, fmt.Errorf("unreasonable user data length: %d", userDataLength)
}
if len(metadata) >= offset+int(userDataLength) {
result.UserData = make([]byte, userDataLength)
copy(result.UserData, metadata[offset:offset+int(userDataLength)])
}
}
return result, nil
}
// ValidateAssignmentStrategy checks if an assignment strategy is supported
func ValidateAssignmentStrategy(strategy string) bool {
supportedStrategies := map[string]bool{
consumer.ProtocolNameRange: true,
consumer.ProtocolNameRoundRobin: true,
consumer.ProtocolNameSticky: true,
consumer.ProtocolNameCooperativeSticky: true, // Incremental cooperative rebalancing (Kafka 2.4+)
}
return supportedStrategies[strategy]
}
// ExtractTopicsFromMetadata extracts topic list from protocol metadata with fallback
func ExtractTopicsFromMetadata(protocols []GroupProtocol, fallbackTopics []string) []string {
for _, protocol := range protocols {
if ValidateAssignmentStrategy(protocol.Name) {
parsed, err := ParseConsumerProtocolMetadata(protocol.Metadata, protocol.Name)
if err != nil {
continue
}
if len(parsed.Topics) > 0 {
return parsed.Topics
}
}
}
// Fallback to provided topics or empty list
if len(fallbackTopics) > 0 {
return fallbackTopics
}
// Return empty slice if no topics found - consumer may be using pattern subscription
return []string{}
}
// SelectBestProtocol chooses the best assignment protocol from available options
func SelectBestProtocol(protocols []GroupProtocol, groupProtocols []string) string {
// Priority order: sticky > roundrobin > range
protocolPriority := []string{consumer.ProtocolNameSticky, consumer.ProtocolNameRoundRobin, consumer.ProtocolNameRange}
// Find supported protocols in client's list
clientProtocols := make(map[string]bool)
for _, protocol := range protocols {
if ValidateAssignmentStrategy(protocol.Name) {
clientProtocols[protocol.Name] = true
}
}
// Find supported protocols in group's list
groupProtocolSet := make(map[string]bool)
for _, protocol := range groupProtocols {
groupProtocolSet[protocol] = true
}
// Select highest priority protocol that both client and group support
for _, preferred := range protocolPriority {
if clientProtocols[preferred] && (len(groupProtocols) == 0 || groupProtocolSet[preferred]) {
return preferred
}
}
// If group has existing protocols, find a protocol supported by both client and group
if len(groupProtocols) > 0 {
// Try to find a protocol that both client and group support
for _, preferred := range protocolPriority {
if clientProtocols[preferred] && groupProtocolSet[preferred] {
return preferred
}
}
// No common protocol found - handle special fallback case
// If client supports nothing we validate, but group supports "range", use "range"
if len(clientProtocols) == 0 && groupProtocolSet[consumer.ProtocolNameRange] {
return consumer.ProtocolNameRange
}
// Return empty string to indicate no compatible protocol found
return ""
}
// Fallback to first supported protocol from client (only when group has no existing protocols)
for _, protocol := range protocols {
if ValidateAssignmentStrategy(protocol.Name) {
return protocol.Name
}
}
// Last resort
return consumer.ProtocolNameRange
}
// ProtocolMetadataDebugInfo returns debug information about protocol metadata
type ProtocolMetadataDebugInfo struct {
Strategy string
Version int16
TopicCount int
Topics []string
UserDataSize int
ParsedOK bool
ParseError string
}
// AnalyzeProtocolMetadata provides detailed debug information about protocol metadata
func AnalyzeProtocolMetadata(protocols []GroupProtocol) []ProtocolMetadataDebugInfo {
result := make([]ProtocolMetadataDebugInfo, 0, len(protocols))
for _, protocol := range protocols {
info := ProtocolMetadataDebugInfo{
Strategy: protocol.Name,
}
parsed, err := ParseConsumerProtocolMetadata(protocol.Metadata, protocol.Name)
if err != nil {
info.ParsedOK = false
info.ParseError = err.Error()
} else {
info.ParsedOK = true
info.Version = parsed.Version
info.TopicCount = len(parsed.Topics)
info.Topics = parsed.Topics
info.UserDataSize = len(parsed.UserData)
}
result = append(result, info)
}
return result
}
|