aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/consumer/assignment.go
blob: 706efe5c92a42fc94aea08a7d582035da147be10 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
package consumer

import (
	"sort"
)

// Assignment strategy protocol names
const (
	ProtocolNameRange             = "range"
	ProtocolNameRoundRobin        = "roundrobin"
	ProtocolNameSticky            = "sticky"
	ProtocolNameCooperativeSticky = "cooperative-sticky"
)

// AssignmentStrategy defines how partitions are assigned to consumers
type AssignmentStrategy interface {
	Name() string
	Assign(members []*GroupMember, topicPartitions map[string][]int32) map[string][]PartitionAssignment
}

// RangeAssignmentStrategy implements the Range assignment strategy
// Assigns partitions in ranges to consumers, similar to Kafka's range assignor
type RangeAssignmentStrategy struct{}

func (r *RangeAssignmentStrategy) Name() string {
	return ProtocolNameRange
}

func (r *RangeAssignmentStrategy) Assign(members []*GroupMember, topicPartitions map[string][]int32) map[string][]PartitionAssignment {
	if len(members) == 0 {
		return make(map[string][]PartitionAssignment)
	}

	assignments := make(map[string][]PartitionAssignment)
	for _, member := range members {
		assignments[member.ID] = make([]PartitionAssignment, 0)
	}

	// Sort members for consistent assignment
	sortedMembers := make([]*GroupMember, len(members))
	copy(sortedMembers, members)
	sort.Slice(sortedMembers, func(i, j int) bool {
		return sortedMembers[i].ID < sortedMembers[j].ID
	})

	// Get all subscribed topics
	subscribedTopics := make(map[string]bool)
	for _, member := range members {
		for _, topic := range member.Subscription {
			subscribedTopics[topic] = true
		}
	}

	// Assign partitions for each topic
	for topic := range subscribedTopics {
		partitions, exists := topicPartitions[topic]
		if !exists {
			continue
		}

		// Sort partitions for consistent assignment
		sort.Slice(partitions, func(i, j int) bool {
			return partitions[i] < partitions[j]
		})

		// Find members subscribed to this topic
		topicMembers := make([]*GroupMember, 0)
		for _, member := range sortedMembers {
			for _, subscribedTopic := range member.Subscription {
				if subscribedTopic == topic {
					topicMembers = append(topicMembers, member)
					break
				}
			}
		}

		if len(topicMembers) == 0 {
			continue
		}

		// Assign partitions to members using range strategy
		numPartitions := len(partitions)
		numMembers := len(topicMembers)
		partitionsPerMember := numPartitions / numMembers
		remainingPartitions := numPartitions % numMembers

		partitionIndex := 0
		for memberIndex, member := range topicMembers {
			// Calculate how many partitions this member should get
			memberPartitions := partitionsPerMember
			if memberIndex < remainingPartitions {
				memberPartitions++
			}

			// Assign partitions to this member
			for i := 0; i < memberPartitions && partitionIndex < numPartitions; i++ {
				assignment := PartitionAssignment{
					Topic:     topic,
					Partition: partitions[partitionIndex],
				}
				assignments[member.ID] = append(assignments[member.ID], assignment)
				partitionIndex++
			}
		}
	}

	return assignments
}

// RoundRobinAssignmentStrategy implements the RoundRobin assignment strategy
// Distributes partitions evenly across all consumers in round-robin fashion
type RoundRobinAssignmentStrategy struct{}

func (rr *RoundRobinAssignmentStrategy) Name() string {
	return ProtocolNameRoundRobin
}

func (rr *RoundRobinAssignmentStrategy) Assign(members []*GroupMember, topicPartitions map[string][]int32) map[string][]PartitionAssignment {
	if len(members) == 0 {
		return make(map[string][]PartitionAssignment)
	}

	assignments := make(map[string][]PartitionAssignment)
	for _, member := range members {
		assignments[member.ID] = make([]PartitionAssignment, 0)
	}

	// Sort members for consistent assignment
	sortedMembers := make([]*GroupMember, len(members))
	copy(sortedMembers, members)
	sort.Slice(sortedMembers, func(i, j int) bool {
		return sortedMembers[i].ID < sortedMembers[j].ID
	})

	// Collect all partition assignments across all topics
	allAssignments := make([]PartitionAssignment, 0)

	// Get all subscribed topics
	subscribedTopics := make(map[string]bool)
	for _, member := range members {
		for _, topic := range member.Subscription {
			subscribedTopics[topic] = true
		}
	}

	// Collect all partitions from all subscribed topics
	for topic := range subscribedTopics {
		partitions, exists := topicPartitions[topic]
		if !exists {
			continue
		}

		for _, partition := range partitions {
			allAssignments = append(allAssignments, PartitionAssignment{
				Topic:     topic,
				Partition: partition,
			})
		}
	}

	// Sort assignments for consistent distribution
	sort.Slice(allAssignments, func(i, j int) bool {
		if allAssignments[i].Topic != allAssignments[j].Topic {
			return allAssignments[i].Topic < allAssignments[j].Topic
		}
		return allAssignments[i].Partition < allAssignments[j].Partition
	})

	// Distribute partitions in round-robin fashion
	memberIndex := 0
	for _, assignment := range allAssignments {
		// Find a member that is subscribed to this topic
		assigned := false
		startIndex := memberIndex

		for !assigned {
			member := sortedMembers[memberIndex]

			// Check if this member is subscribed to the topic
			subscribed := false
			for _, topic := range member.Subscription {
				if topic == assignment.Topic {
					subscribed = true
					break
				}
			}

			if subscribed {
				assignments[member.ID] = append(assignments[member.ID], assignment)
				assigned = true
			}

			memberIndex = (memberIndex + 1) % len(sortedMembers)

			// Prevent infinite loop if no member is subscribed to this topic
			if memberIndex == startIndex && !assigned {
				break
			}
		}
	}

	return assignments
}

// GetAssignmentStrategy returns the appropriate assignment strategy
func GetAssignmentStrategy(name string) AssignmentStrategy {
	switch name {
	case ProtocolNameRange:
		return &RangeAssignmentStrategy{}
	case ProtocolNameRoundRobin:
		return &RoundRobinAssignmentStrategy{}
	case ProtocolNameCooperativeSticky:
		return NewIncrementalCooperativeAssignmentStrategy()
	default:
		// Default to range strategy
		return &RangeAssignmentStrategy{}
	}
}

// AssignPartitions performs partition assignment for a consumer group
func (group *ConsumerGroup) AssignPartitions(topicPartitions map[string][]int32) {
	if len(group.Members) == 0 {
		return
	}

	// Convert members map to slice
	members := make([]*GroupMember, 0, len(group.Members))
	for _, member := range group.Members {
		if member.State == MemberStateStable || member.State == MemberStatePending {
			members = append(members, member)
		}
	}

	if len(members) == 0 {
		return
	}

	// Get assignment strategy
	strategy := GetAssignmentStrategy(group.Protocol)
	assignments := strategy.Assign(members, topicPartitions)

	// Apply assignments to members
	for memberID, assignment := range assignments {
		if member, exists := group.Members[memberID]; exists {
			member.Assignment = assignment
		}
	}
}

// GetMemberAssignments returns the current partition assignments for all members
func (group *ConsumerGroup) GetMemberAssignments() map[string][]PartitionAssignment {
	group.Mu.RLock()
	defer group.Mu.RUnlock()

	assignments := make(map[string][]PartitionAssignment)
	for memberID, member := range group.Members {
		assignments[memberID] = make([]PartitionAssignment, len(member.Assignment))
		copy(assignments[memberID], member.Assignment)
	}

	return assignments
}

// UpdateMemberSubscription updates a member's topic subscription
func (group *ConsumerGroup) UpdateMemberSubscription(memberID string, topics []string) {
	group.Mu.Lock()
	defer group.Mu.Unlock()

	member, exists := group.Members[memberID]
	if !exists {
		return
	}

	// Update member subscription
	member.Subscription = make([]string, len(topics))
	copy(member.Subscription, topics)

	// Update group's subscribed topics
	group.SubscribedTopics = make(map[string]bool)
	for _, m := range group.Members {
		for _, topic := range m.Subscription {
			group.SubscribedTopics[topic] = true
		}
	}
}

// GetSubscribedTopics returns all topics subscribed by the group
func (group *ConsumerGroup) GetSubscribedTopics() []string {
	group.Mu.RLock()
	defer group.Mu.RUnlock()

	topics := make([]string, 0, len(group.SubscribedTopics))
	for topic := range group.SubscribedTopics {
		topics = append(topics, topic)
	}

	sort.Strings(topics)
	return topics
}