diff options
Diffstat (limited to 'weed/mq/kafka/consumer/assignment.go')
| -rw-r--r-- | weed/mq/kafka/consumer/assignment.go | 195 |
1 files changed, 13 insertions, 182 deletions
diff --git a/weed/mq/kafka/consumer/assignment.go b/weed/mq/kafka/consumer/assignment.go index 5799ed2b5..706efe5c9 100644 --- a/weed/mq/kafka/consumer/assignment.go +++ b/weed/mq/kafka/consumer/assignment.go @@ -4,6 +4,14 @@ import ( "sort" ) +// Assignment strategy protocol names +const ( + ProtocolNameRange = "range" + ProtocolNameRoundRobin = "roundrobin" + ProtocolNameSticky = "sticky" + ProtocolNameCooperativeSticky = "cooperative-sticky" +) + // AssignmentStrategy defines how partitions are assigned to consumers type AssignmentStrategy interface { Name() string @@ -15,7 +23,7 @@ type AssignmentStrategy interface { type RangeAssignmentStrategy struct{} func (r *RangeAssignmentStrategy) Name() string { - return "range" + return ProtocolNameRange } func (r *RangeAssignmentStrategy) Assign(members []*GroupMember, topicPartitions map[string][]int32) map[string][]PartitionAssignment { @@ -104,7 +112,7 @@ func (r *RangeAssignmentStrategy) Assign(members []*GroupMember, topicPartitions type RoundRobinAssignmentStrategy struct{} func (rr *RoundRobinAssignmentStrategy) Name() string { - return "roundrobin" + return ProtocolNameRoundRobin } func (rr *RoundRobinAssignmentStrategy) Assign(members []*GroupMember, topicPartitions map[string][]int32) map[string][]PartitionAssignment { @@ -194,191 +202,14 @@ func (rr *RoundRobinAssignmentStrategy) Assign(members []*GroupMember, topicPart return assignments } -// CooperativeStickyAssignmentStrategy implements the cooperative-sticky assignment strategy -// This strategy tries to minimize partition movement during rebalancing while ensuring fairness -type CooperativeStickyAssignmentStrategy struct{} - -func (cs *CooperativeStickyAssignmentStrategy) Name() string { - return "cooperative-sticky" -} - -func (cs *CooperativeStickyAssignmentStrategy) Assign(members []*GroupMember, topicPartitions map[string][]int32) map[string][]PartitionAssignment { - if len(members) == 0 { - return make(map[string][]PartitionAssignment) - } - - assignments := make(map[string][]PartitionAssignment) - for _, member := range members { - assignments[member.ID] = make([]PartitionAssignment, 0) - } - - // Sort members for consistent assignment - sortedMembers := make([]*GroupMember, len(members)) - copy(sortedMembers, members) - sort.Slice(sortedMembers, func(i, j int) bool { - return sortedMembers[i].ID < sortedMembers[j].ID - }) - - // Get all subscribed topics - subscribedTopics := make(map[string]bool) - for _, member := range members { - for _, topic := range member.Subscription { - subscribedTopics[topic] = true - } - } - - // Collect all partitions that need assignment - allPartitions := make([]PartitionAssignment, 0) - for topic := range subscribedTopics { - partitions, exists := topicPartitions[topic] - if !exists { - continue - } - - for _, partition := range partitions { - allPartitions = append(allPartitions, PartitionAssignment{ - Topic: topic, - Partition: partition, - }) - } - } - - // Sort partitions for consistent assignment - sort.Slice(allPartitions, func(i, j int) bool { - if allPartitions[i].Topic != allPartitions[j].Topic { - return allPartitions[i].Topic < allPartitions[j].Topic - } - return allPartitions[i].Partition < allPartitions[j].Partition - }) - - // Calculate target assignment counts for fairness - totalPartitions := len(allPartitions) - numMembers := len(sortedMembers) - baseAssignments := totalPartitions / numMembers - extraAssignments := totalPartitions % numMembers - - // Phase 1: Try to preserve existing assignments (sticky behavior) but respect fairness - currentAssignments := make(map[string]map[PartitionAssignment]bool) - for _, member := range sortedMembers { - currentAssignments[member.ID] = make(map[PartitionAssignment]bool) - for _, assignment := range member.Assignment { - currentAssignments[member.ID][assignment] = true - } - } - - // Track which partitions are already assigned - assignedPartitions := make(map[PartitionAssignment]bool) - - // Preserve existing assignments where possible, but respect target counts - for i, member := range sortedMembers { - // Calculate target count for this member - targetCount := baseAssignments - if i < extraAssignments { - targetCount++ - } - - assignedCount := 0 - for assignment := range currentAssignments[member.ID] { - // Stop if we've reached the target count for this member - if assignedCount >= targetCount { - break - } - - // Check if member is still subscribed to this topic - subscribed := false - for _, topic := range member.Subscription { - if topic == assignment.Topic { - subscribed = true - break - } - } - - if subscribed && !assignedPartitions[assignment] { - assignments[member.ID] = append(assignments[member.ID], assignment) - assignedPartitions[assignment] = true - assignedCount++ - } - } - } - - // Phase 2: Assign remaining partitions using round-robin for fairness - unassignedPartitions := make([]PartitionAssignment, 0) - for _, partition := range allPartitions { - if !assignedPartitions[partition] { - unassignedPartitions = append(unassignedPartitions, partition) - } - } - - // Assign remaining partitions to achieve fairness - memberIndex := 0 - for _, partition := range unassignedPartitions { - // Find a member that needs more partitions and is subscribed to this topic - assigned := false - startIndex := memberIndex - - for !assigned { - member := sortedMembers[memberIndex] - - // Check if this member is subscribed to the topic - subscribed := false - for _, topic := range member.Subscription { - if topic == partition.Topic { - subscribed = true - break - } - } - - if subscribed { - // Calculate target count for this member - targetCount := baseAssignments - if memberIndex < extraAssignments { - targetCount++ - } - - // Assign if member needs more partitions - if len(assignments[member.ID]) < targetCount { - assignments[member.ID] = append(assignments[member.ID], partition) - assigned = true - } - } - - memberIndex = (memberIndex + 1) % numMembers - - // Prevent infinite loop - if memberIndex == startIndex && !assigned { - // Force assign to any subscribed member - for _, member := range sortedMembers { - subscribed := false - for _, topic := range member.Subscription { - if topic == partition.Topic { - subscribed = true - break - } - } - if subscribed { - assignments[member.ID] = append(assignments[member.ID], partition) - assigned = true - break - } - } - break - } - } - } - - return assignments -} - // GetAssignmentStrategy returns the appropriate assignment strategy func GetAssignmentStrategy(name string) AssignmentStrategy { switch name { - case "range": + case ProtocolNameRange: return &RangeAssignmentStrategy{} - case "roundrobin": + case ProtocolNameRoundRobin: return &RoundRobinAssignmentStrategy{} - case "cooperative-sticky": - return &CooperativeStickyAssignmentStrategy{} - case "incremental-cooperative": + case ProtocolNameCooperativeSticky: return NewIncrementalCooperativeAssignmentStrategy() default: // Default to range strategy |
