aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/protocol/consumer_group_metadata.go
blob: 1c934238f4bd6a9cdb1bdd2d96728765189faada (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
package protocol

import (
	"encoding/binary"
	"fmt"
	"net"
	"sync"

	"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
)

// ConsumerProtocolMetadata represents parsed consumer protocol metadata
type ConsumerProtocolMetadata struct {
	Version            int16    // Protocol metadata version
	Topics             []string // Subscribed topic names
	UserData           []byte   // Optional user data
	AssignmentStrategy string   // Preferred assignment strategy
}

// ConnectionContext holds connection-specific information for requests
type ConnectionContext struct {
	RemoteAddr    net.Addr // Client's remote address
	LocalAddr     net.Addr // Server's local address
	ConnectionID  string   // Connection identifier
	ClientID      string   // Kafka client ID from request headers
	ConsumerGroup string   // Consumer group (set by JoinGroup)
	MemberID      string   // Consumer group member ID (set by JoinGroup)
	// Per-connection broker client for isolated gRPC streams
	// Each Kafka connection MUST have its own gRPC streams to avoid interference
	// when multiple consumers or requests are active on different connections
	BrokerClient interface{} // Will be set to *integration.BrokerClient

	// Persistent partition readers - one goroutine per topic-partition that maintains position
	// and streams forward, eliminating repeated offset lookups and reducing broker CPU load
	partitionReaders sync.Map // map[TopicPartitionKey]*partitionReader
}

// ExtractClientHost extracts the client hostname/IP from connection context
func ExtractClientHost(connCtx *ConnectionContext) string {
	if connCtx == nil || connCtx.RemoteAddr == nil {
		return "unknown"
	}

	// Extract host portion from address
	if tcpAddr, ok := connCtx.RemoteAddr.(*net.TCPAddr); ok {
		return tcpAddr.IP.String()
	}

	// Fallback: parse string representation
	addrStr := connCtx.RemoteAddr.String()
	if host, _, err := net.SplitHostPort(addrStr); err == nil {
		return host
	}

	// Last resort: return full address
	return addrStr
}

// ParseConsumerProtocolMetadata parses consumer protocol metadata with enhanced error handling
func ParseConsumerProtocolMetadata(metadata []byte, strategyName string) (*ConsumerProtocolMetadata, error) {
	if len(metadata) < 2 {
		return &ConsumerProtocolMetadata{
			Version:            0,
			Topics:             []string{},
			UserData:           []byte{},
			AssignmentStrategy: strategyName,
		}, nil
	}

	result := &ConsumerProtocolMetadata{
		AssignmentStrategy: strategyName,
	}

	offset := 0

	// Parse version (2 bytes)
	if len(metadata) < offset+2 {
		return nil, fmt.Errorf("metadata too short for version field")
	}
	result.Version = int16(binary.BigEndian.Uint16(metadata[offset : offset+2]))
	offset += 2

	// Parse topics array
	if len(metadata) < offset+4 {
		return nil, fmt.Errorf("metadata too short for topics count")
	}
	topicsCount := binary.BigEndian.Uint32(metadata[offset : offset+4])
	offset += 4

	// Validate topics count (reasonable limit)
	if topicsCount > 10000 {
		return nil, fmt.Errorf("unreasonable topics count: %d", topicsCount)
	}

	result.Topics = make([]string, 0, topicsCount)

	for i := uint32(0); i < topicsCount && offset < len(metadata); i++ {
		// Parse topic name length
		if len(metadata) < offset+2 {
			return nil, fmt.Errorf("metadata too short for topic %d name length", i)
		}
		topicNameLength := binary.BigEndian.Uint16(metadata[offset : offset+2])
		offset += 2

		// Validate topic name length
		if topicNameLength > 1000 {
			return nil, fmt.Errorf("unreasonable topic name length: %d", topicNameLength)
		}

		if len(metadata) < offset+int(topicNameLength) {
			return nil, fmt.Errorf("metadata too short for topic %d name data", i)
		}

		topicName := string(metadata[offset : offset+int(topicNameLength)])
		offset += int(topicNameLength)

		// Validate topic name (basic validation)
		if len(topicName) == 0 {
			continue // Skip empty topic names
		}

		result.Topics = append(result.Topics, topicName)
	}

	// Parse user data if remaining bytes exist
	if len(metadata) >= offset+4 {
		userDataLength := binary.BigEndian.Uint32(metadata[offset : offset+4])
		offset += 4

		// Handle -1 (0xFFFFFFFF) as null/empty user data (Kafka protocol convention)
		if userDataLength == 0xFFFFFFFF {
			result.UserData = []byte{}
			return result, nil
		}

		// Validate user data length
		if userDataLength > 100000 { // 100KB limit
			return nil, fmt.Errorf("unreasonable user data length: %d", userDataLength)
		}

		if len(metadata) >= offset+int(userDataLength) {
			result.UserData = make([]byte, userDataLength)
			copy(result.UserData, metadata[offset:offset+int(userDataLength)])
		}
	}

	return result, nil
}

// ValidateAssignmentStrategy checks if an assignment strategy is supported
func ValidateAssignmentStrategy(strategy string) bool {
	supportedStrategies := map[string]bool{
		consumer.ProtocolNameRange:             true,
		consumer.ProtocolNameRoundRobin:        true,
		consumer.ProtocolNameSticky:            true,
		consumer.ProtocolNameCooperativeSticky: true, // Incremental cooperative rebalancing (Kafka 2.4+)
	}

	return supportedStrategies[strategy]
}

// ExtractTopicsFromMetadata extracts topic list from protocol metadata with fallback
func ExtractTopicsFromMetadata(protocols []GroupProtocol, fallbackTopics []string) []string {
	for _, protocol := range protocols {
		if ValidateAssignmentStrategy(protocol.Name) {
			parsed, err := ParseConsumerProtocolMetadata(protocol.Metadata, protocol.Name)
			if err != nil {
				continue
			}

			if len(parsed.Topics) > 0 {
				return parsed.Topics
			}
		}
	}

	// Fallback to provided topics or empty list
	if len(fallbackTopics) > 0 {
		return fallbackTopics
	}

	// Return empty slice if no topics found - consumer may be using pattern subscription
	return []string{}
}

// SelectBestProtocol chooses the best assignment protocol from available options
func SelectBestProtocol(protocols []GroupProtocol, groupProtocols []string) string {
	// Priority order: sticky > roundrobin > range
	protocolPriority := []string{consumer.ProtocolNameSticky, consumer.ProtocolNameRoundRobin, consumer.ProtocolNameRange}

	// Find supported protocols in client's list
	clientProtocols := make(map[string]bool)
	for _, protocol := range protocols {
		if ValidateAssignmentStrategy(protocol.Name) {
			clientProtocols[protocol.Name] = true
		}
	}

	// Find supported protocols in group's list
	groupProtocolSet := make(map[string]bool)
	for _, protocol := range groupProtocols {
		groupProtocolSet[protocol] = true
	}

	// Select highest priority protocol that both client and group support
	for _, preferred := range protocolPriority {
		if clientProtocols[preferred] && (len(groupProtocols) == 0 || groupProtocolSet[preferred]) {
			return preferred
		}
	}

	// If group has existing protocols, find a protocol supported by both client and group
	if len(groupProtocols) > 0 {
		// Try to find a protocol that both client and group support
		for _, preferred := range protocolPriority {
			if clientProtocols[preferred] && groupProtocolSet[preferred] {
				return preferred
			}
		}

		// No common protocol found - handle special fallback case
		// If client supports nothing we validate, but group supports "range", use "range"
		if len(clientProtocols) == 0 && groupProtocolSet[consumer.ProtocolNameRange] {
			return consumer.ProtocolNameRange
		}

		// Return empty string to indicate no compatible protocol found
		return ""
	}

	// Fallback to first supported protocol from client (only when group has no existing protocols)
	for _, protocol := range protocols {
		if ValidateAssignmentStrategy(protocol.Name) {
			return protocol.Name
		}
	}

	// Last resort
	return consumer.ProtocolNameRange
}

// ProtocolMetadataDebugInfo returns debug information about protocol metadata
type ProtocolMetadataDebugInfo struct {
	Strategy     string
	Version      int16
	TopicCount   int
	Topics       []string
	UserDataSize int
	ParsedOK     bool
	ParseError   string
}

// AnalyzeProtocolMetadata provides detailed debug information about protocol metadata
func AnalyzeProtocolMetadata(protocols []GroupProtocol) []ProtocolMetadataDebugInfo {
	result := make([]ProtocolMetadataDebugInfo, 0, len(protocols))

	for _, protocol := range protocols {
		info := ProtocolMetadataDebugInfo{
			Strategy: protocol.Name,
		}

		parsed, err := ParseConsumerProtocolMetadata(protocol.Metadata, protocol.Name)
		if err != nil {
			info.ParsedOK = false
			info.ParseError = err.Error()
		} else {
			info.ParsedOK = true
			info.Version = parsed.Version
			info.TopicCount = len(parsed.Topics)
			info.Topics = parsed.Topics
			info.UserDataSize = len(parsed.UserData)
		}

		result = append(result, info)
	}

	return result
}