diff options
Diffstat (limited to 'weed/mq/kafka/consumer/assignment.go')
| -rw-r--r-- | weed/mq/kafka/consumer/assignment.go | 468 |
1 files changed, 468 insertions, 0 deletions
diff --git a/weed/mq/kafka/consumer/assignment.go b/weed/mq/kafka/consumer/assignment.go new file mode 100644 index 000000000..5799ed2b5 --- /dev/null +++ b/weed/mq/kafka/consumer/assignment.go @@ -0,0 +1,468 @@ +package consumer + +import ( + "sort" +) + +// AssignmentStrategy defines how partitions are assigned to consumers +type AssignmentStrategy interface { + Name() string + Assign(members []*GroupMember, topicPartitions map[string][]int32) map[string][]PartitionAssignment +} + +// RangeAssignmentStrategy implements the Range assignment strategy +// Assigns partitions in ranges to consumers, similar to Kafka's range assignor +type RangeAssignmentStrategy struct{} + +func (r *RangeAssignmentStrategy) Name() string { + return "range" +} + +func (r *RangeAssignmentStrategy) Assign(members []*GroupMember, topicPartitions map[string][]int32) map[string][]PartitionAssignment { + if len(members) == 0 { + return make(map[string][]PartitionAssignment) + } + + assignments := make(map[string][]PartitionAssignment) + for _, member := range members { + assignments[member.ID] = make([]PartitionAssignment, 0) + } + + // Sort members for consistent assignment + sortedMembers := make([]*GroupMember, len(members)) + copy(sortedMembers, members) + sort.Slice(sortedMembers, func(i, j int) bool { + return sortedMembers[i].ID < sortedMembers[j].ID + }) + + // Get all subscribed topics + subscribedTopics := make(map[string]bool) + for _, member := range members { + for _, topic := range member.Subscription { + subscribedTopics[topic] = true + } + } + + // Assign partitions for each topic + for topic := range subscribedTopics { + partitions, exists := topicPartitions[topic] + if !exists { + continue + } + + // Sort partitions for consistent assignment + sort.Slice(partitions, func(i, j int) bool { + return partitions[i] < partitions[j] + }) + + // Find members subscribed to this topic + topicMembers := make([]*GroupMember, 0) + for _, member := range sortedMembers { + for _, subscribedTopic := range member.Subscription { + if subscribedTopic == topic { + topicMembers = append(topicMembers, member) + break + } + } + } + + if len(topicMembers) == 0 { + continue + } + + // Assign partitions to members using range strategy + numPartitions := len(partitions) + numMembers := len(topicMembers) + partitionsPerMember := numPartitions / numMembers + remainingPartitions := numPartitions % numMembers + + partitionIndex := 0 + for memberIndex, member := range topicMembers { + // Calculate how many partitions this member should get + memberPartitions := partitionsPerMember + if memberIndex < remainingPartitions { + memberPartitions++ + } + + // Assign partitions to this member + for i := 0; i < memberPartitions && partitionIndex < numPartitions; i++ { + assignment := PartitionAssignment{ + Topic: topic, + Partition: partitions[partitionIndex], + } + assignments[member.ID] = append(assignments[member.ID], assignment) + partitionIndex++ + } + } + } + + return assignments +} + +// RoundRobinAssignmentStrategy implements the RoundRobin assignment strategy +// Distributes partitions evenly across all consumers in round-robin fashion +type RoundRobinAssignmentStrategy struct{} + +func (rr *RoundRobinAssignmentStrategy) Name() string { + return "roundrobin" +} + +func (rr *RoundRobinAssignmentStrategy) Assign(members []*GroupMember, topicPartitions map[string][]int32) map[string][]PartitionAssignment { + if len(members) == 0 { + return make(map[string][]PartitionAssignment) + } + + assignments := make(map[string][]PartitionAssignment) + for _, member := range members { + assignments[member.ID] = make([]PartitionAssignment, 0) + } + + // Sort members for consistent assignment + sortedMembers := make([]*GroupMember, len(members)) + copy(sortedMembers, members) + sort.Slice(sortedMembers, func(i, j int) bool { + return sortedMembers[i].ID < sortedMembers[j].ID + }) + + // Collect all partition assignments across all topics + allAssignments := make([]PartitionAssignment, 0) + + // Get all subscribed topics + subscribedTopics := make(map[string]bool) + for _, member := range members { + for _, topic := range member.Subscription { + subscribedTopics[topic] = true + } + } + + // Collect all partitions from all subscribed topics + for topic := range subscribedTopics { + partitions, exists := topicPartitions[topic] + if !exists { + continue + } + + for _, partition := range partitions { + allAssignments = append(allAssignments, PartitionAssignment{ + Topic: topic, + Partition: partition, + }) + } + } + + // Sort assignments for consistent distribution + sort.Slice(allAssignments, func(i, j int) bool { + if allAssignments[i].Topic != allAssignments[j].Topic { + return allAssignments[i].Topic < allAssignments[j].Topic + } + return allAssignments[i].Partition < allAssignments[j].Partition + }) + + // Distribute partitions in round-robin fashion + memberIndex := 0 + for _, assignment := range allAssignments { + // Find a member that is subscribed to this topic + assigned := false + startIndex := memberIndex + + for !assigned { + member := sortedMembers[memberIndex] + + // Check if this member is subscribed to the topic + subscribed := false + for _, topic := range member.Subscription { + if topic == assignment.Topic { + subscribed = true + break + } + } + + if subscribed { + assignments[member.ID] = append(assignments[member.ID], assignment) + assigned = true + } + + memberIndex = (memberIndex + 1) % len(sortedMembers) + + // Prevent infinite loop if no member is subscribed to this topic + if memberIndex == startIndex && !assigned { + break + } + } + } + + return assignments +} + +// CooperativeStickyAssignmentStrategy implements the cooperative-sticky assignment strategy +// This strategy tries to minimize partition movement during rebalancing while ensuring fairness +type CooperativeStickyAssignmentStrategy struct{} + +func (cs *CooperativeStickyAssignmentStrategy) Name() string { + return "cooperative-sticky" +} + +func (cs *CooperativeStickyAssignmentStrategy) Assign(members []*GroupMember, topicPartitions map[string][]int32) map[string][]PartitionAssignment { + if len(members) == 0 { + return make(map[string][]PartitionAssignment) + } + + assignments := make(map[string][]PartitionAssignment) + for _, member := range members { + assignments[member.ID] = make([]PartitionAssignment, 0) + } + + // Sort members for consistent assignment + sortedMembers := make([]*GroupMember, len(members)) + copy(sortedMembers, members) + sort.Slice(sortedMembers, func(i, j int) bool { + return sortedMembers[i].ID < sortedMembers[j].ID + }) + + // Get all subscribed topics + subscribedTopics := make(map[string]bool) + for _, member := range members { + for _, topic := range member.Subscription { + subscribedTopics[topic] = true + } + } + + // Collect all partitions that need assignment + allPartitions := make([]PartitionAssignment, 0) + for topic := range subscribedTopics { + partitions, exists := topicPartitions[topic] + if !exists { + continue + } + + for _, partition := range partitions { + allPartitions = append(allPartitions, PartitionAssignment{ + Topic: topic, + Partition: partition, + }) + } + } + + // Sort partitions for consistent assignment + sort.Slice(allPartitions, func(i, j int) bool { + if allPartitions[i].Topic != allPartitions[j].Topic { + return allPartitions[i].Topic < allPartitions[j].Topic + } + return allPartitions[i].Partition < allPartitions[j].Partition + }) + + // Calculate target assignment counts for fairness + totalPartitions := len(allPartitions) + numMembers := len(sortedMembers) + baseAssignments := totalPartitions / numMembers + extraAssignments := totalPartitions % numMembers + + // Phase 1: Try to preserve existing assignments (sticky behavior) but respect fairness + currentAssignments := make(map[string]map[PartitionAssignment]bool) + for _, member := range sortedMembers { + currentAssignments[member.ID] = make(map[PartitionAssignment]bool) + for _, assignment := range member.Assignment { + currentAssignments[member.ID][assignment] = true + } + } + + // Track which partitions are already assigned + assignedPartitions := make(map[PartitionAssignment]bool) + + // Preserve existing assignments where possible, but respect target counts + for i, member := range sortedMembers { + // Calculate target count for this member + targetCount := baseAssignments + if i < extraAssignments { + targetCount++ + } + + assignedCount := 0 + for assignment := range currentAssignments[member.ID] { + // Stop if we've reached the target count for this member + if assignedCount >= targetCount { + break + } + + // Check if member is still subscribed to this topic + subscribed := false + for _, topic := range member.Subscription { + if topic == assignment.Topic { + subscribed = true + break + } + } + + if subscribed && !assignedPartitions[assignment] { + assignments[member.ID] = append(assignments[member.ID], assignment) + assignedPartitions[assignment] = true + assignedCount++ + } + } + } + + // Phase 2: Assign remaining partitions using round-robin for fairness + unassignedPartitions := make([]PartitionAssignment, 0) + for _, partition := range allPartitions { + if !assignedPartitions[partition] { + unassignedPartitions = append(unassignedPartitions, partition) + } + } + + // Assign remaining partitions to achieve fairness + memberIndex := 0 + for _, partition := range unassignedPartitions { + // Find a member that needs more partitions and is subscribed to this topic + assigned := false + startIndex := memberIndex + + for !assigned { + member := sortedMembers[memberIndex] + + // Check if this member is subscribed to the topic + subscribed := false + for _, topic := range member.Subscription { + if topic == partition.Topic { + subscribed = true + break + } + } + + if subscribed { + // Calculate target count for this member + targetCount := baseAssignments + if memberIndex < extraAssignments { + targetCount++ + } + + // Assign if member needs more partitions + if len(assignments[member.ID]) < targetCount { + assignments[member.ID] = append(assignments[member.ID], partition) + assigned = true + } + } + + memberIndex = (memberIndex + 1) % numMembers + + // Prevent infinite loop + if memberIndex == startIndex && !assigned { + // Force assign to any subscribed member + for _, member := range sortedMembers { + subscribed := false + for _, topic := range member.Subscription { + if topic == partition.Topic { + subscribed = true + break + } + } + if subscribed { + assignments[member.ID] = append(assignments[member.ID], partition) + assigned = true + break + } + } + break + } + } + } + + return assignments +} + +// GetAssignmentStrategy returns the appropriate assignment strategy +func GetAssignmentStrategy(name string) AssignmentStrategy { + switch name { + case "range": + return &RangeAssignmentStrategy{} + case "roundrobin": + return &RoundRobinAssignmentStrategy{} + case "cooperative-sticky": + return &CooperativeStickyAssignmentStrategy{} + case "incremental-cooperative": + return NewIncrementalCooperativeAssignmentStrategy() + default: + // Default to range strategy + return &RangeAssignmentStrategy{} + } +} + +// AssignPartitions performs partition assignment for a consumer group +func (group *ConsumerGroup) AssignPartitions(topicPartitions map[string][]int32) { + if len(group.Members) == 0 { + return + } + + // Convert members map to slice + members := make([]*GroupMember, 0, len(group.Members)) + for _, member := range group.Members { + if member.State == MemberStateStable || member.State == MemberStatePending { + members = append(members, member) + } + } + + if len(members) == 0 { + return + } + + // Get assignment strategy + strategy := GetAssignmentStrategy(group.Protocol) + assignments := strategy.Assign(members, topicPartitions) + + // Apply assignments to members + for memberID, assignment := range assignments { + if member, exists := group.Members[memberID]; exists { + member.Assignment = assignment + } + } +} + +// GetMemberAssignments returns the current partition assignments for all members +func (group *ConsumerGroup) GetMemberAssignments() map[string][]PartitionAssignment { + group.Mu.RLock() + defer group.Mu.RUnlock() + + assignments := make(map[string][]PartitionAssignment) + for memberID, member := range group.Members { + assignments[memberID] = make([]PartitionAssignment, len(member.Assignment)) + copy(assignments[memberID], member.Assignment) + } + + return assignments +} + +// UpdateMemberSubscription updates a member's topic subscription +func (group *ConsumerGroup) UpdateMemberSubscription(memberID string, topics []string) { + group.Mu.Lock() + defer group.Mu.Unlock() + + member, exists := group.Members[memberID] + if !exists { + return + } + + // Update member subscription + member.Subscription = make([]string, len(topics)) + copy(member.Subscription, topics) + + // Update group's subscribed topics + group.SubscribedTopics = make(map[string]bool) + for _, m := range group.Members { + for _, topic := range m.Subscription { + group.SubscribedTopics[topic] = true + } + } +} + +// GetSubscribedTopics returns all topics subscribed by the group +func (group *ConsumerGroup) GetSubscribedTopics() []string { + group.Mu.RLock() + defer group.Mu.RUnlock() + + topics := make([]string, 0, len(group.SubscribedTopics)) + for topic := range group.SubscribedTopics { + topics = append(topics, topic) + } + + sort.Strings(topics) + return topics +} |
