aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/consumer/assignment.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/consumer/assignment.go')
-rw-r--r--weed/mq/kafka/consumer/assignment.go195
1 files changed, 13 insertions, 182 deletions
diff --git a/weed/mq/kafka/consumer/assignment.go b/weed/mq/kafka/consumer/assignment.go
index 5799ed2b5..706efe5c9 100644
--- a/weed/mq/kafka/consumer/assignment.go
+++ b/weed/mq/kafka/consumer/assignment.go
@@ -4,6 +4,14 @@ import (
"sort"
)
+// Assignment strategy protocol names
+const (
+ ProtocolNameRange = "range"
+ ProtocolNameRoundRobin = "roundrobin"
+ ProtocolNameSticky = "sticky"
+ ProtocolNameCooperativeSticky = "cooperative-sticky"
+)
+
// AssignmentStrategy defines how partitions are assigned to consumers
type AssignmentStrategy interface {
Name() string
@@ -15,7 +23,7 @@ type AssignmentStrategy interface {
type RangeAssignmentStrategy struct{}
func (r *RangeAssignmentStrategy) Name() string {
- return "range"
+ return ProtocolNameRange
}
func (r *RangeAssignmentStrategy) Assign(members []*GroupMember, topicPartitions map[string][]int32) map[string][]PartitionAssignment {
@@ -104,7 +112,7 @@ func (r *RangeAssignmentStrategy) Assign(members []*GroupMember, topicPartitions
type RoundRobinAssignmentStrategy struct{}
func (rr *RoundRobinAssignmentStrategy) Name() string {
- return "roundrobin"
+ return ProtocolNameRoundRobin
}
func (rr *RoundRobinAssignmentStrategy) Assign(members []*GroupMember, topicPartitions map[string][]int32) map[string][]PartitionAssignment {
@@ -194,191 +202,14 @@ func (rr *RoundRobinAssignmentStrategy) Assign(members []*GroupMember, topicPart
return assignments
}
-// CooperativeStickyAssignmentStrategy implements the cooperative-sticky assignment strategy
-// This strategy tries to minimize partition movement during rebalancing while ensuring fairness
-type CooperativeStickyAssignmentStrategy struct{}
-
-func (cs *CooperativeStickyAssignmentStrategy) Name() string {
- return "cooperative-sticky"
-}
-
-func (cs *CooperativeStickyAssignmentStrategy) Assign(members []*GroupMember, topicPartitions map[string][]int32) map[string][]PartitionAssignment {
- if len(members) == 0 {
- return make(map[string][]PartitionAssignment)
- }
-
- assignments := make(map[string][]PartitionAssignment)
- for _, member := range members {
- assignments[member.ID] = make([]PartitionAssignment, 0)
- }
-
- // Sort members for consistent assignment
- sortedMembers := make([]*GroupMember, len(members))
- copy(sortedMembers, members)
- sort.Slice(sortedMembers, func(i, j int) bool {
- return sortedMembers[i].ID < sortedMembers[j].ID
- })
-
- // Get all subscribed topics
- subscribedTopics := make(map[string]bool)
- for _, member := range members {
- for _, topic := range member.Subscription {
- subscribedTopics[topic] = true
- }
- }
-
- // Collect all partitions that need assignment
- allPartitions := make([]PartitionAssignment, 0)
- for topic := range subscribedTopics {
- partitions, exists := topicPartitions[topic]
- if !exists {
- continue
- }
-
- for _, partition := range partitions {
- allPartitions = append(allPartitions, PartitionAssignment{
- Topic: topic,
- Partition: partition,
- })
- }
- }
-
- // Sort partitions for consistent assignment
- sort.Slice(allPartitions, func(i, j int) bool {
- if allPartitions[i].Topic != allPartitions[j].Topic {
- return allPartitions[i].Topic < allPartitions[j].Topic
- }
- return allPartitions[i].Partition < allPartitions[j].Partition
- })
-
- // Calculate target assignment counts for fairness
- totalPartitions := len(allPartitions)
- numMembers := len(sortedMembers)
- baseAssignments := totalPartitions / numMembers
- extraAssignments := totalPartitions % numMembers
-
- // Phase 1: Try to preserve existing assignments (sticky behavior) but respect fairness
- currentAssignments := make(map[string]map[PartitionAssignment]bool)
- for _, member := range sortedMembers {
- currentAssignments[member.ID] = make(map[PartitionAssignment]bool)
- for _, assignment := range member.Assignment {
- currentAssignments[member.ID][assignment] = true
- }
- }
-
- // Track which partitions are already assigned
- assignedPartitions := make(map[PartitionAssignment]bool)
-
- // Preserve existing assignments where possible, but respect target counts
- for i, member := range sortedMembers {
- // Calculate target count for this member
- targetCount := baseAssignments
- if i < extraAssignments {
- targetCount++
- }
-
- assignedCount := 0
- for assignment := range currentAssignments[member.ID] {
- // Stop if we've reached the target count for this member
- if assignedCount >= targetCount {
- break
- }
-
- // Check if member is still subscribed to this topic
- subscribed := false
- for _, topic := range member.Subscription {
- if topic == assignment.Topic {
- subscribed = true
- break
- }
- }
-
- if subscribed && !assignedPartitions[assignment] {
- assignments[member.ID] = append(assignments[member.ID], assignment)
- assignedPartitions[assignment] = true
- assignedCount++
- }
- }
- }
-
- // Phase 2: Assign remaining partitions using round-robin for fairness
- unassignedPartitions := make([]PartitionAssignment, 0)
- for _, partition := range allPartitions {
- if !assignedPartitions[partition] {
- unassignedPartitions = append(unassignedPartitions, partition)
- }
- }
-
- // Assign remaining partitions to achieve fairness
- memberIndex := 0
- for _, partition := range unassignedPartitions {
- // Find a member that needs more partitions and is subscribed to this topic
- assigned := false
- startIndex := memberIndex
-
- for !assigned {
- member := sortedMembers[memberIndex]
-
- // Check if this member is subscribed to the topic
- subscribed := false
- for _, topic := range member.Subscription {
- if topic == partition.Topic {
- subscribed = true
- break
- }
- }
-
- if subscribed {
- // Calculate target count for this member
- targetCount := baseAssignments
- if memberIndex < extraAssignments {
- targetCount++
- }
-
- // Assign if member needs more partitions
- if len(assignments[member.ID]) < targetCount {
- assignments[member.ID] = append(assignments[member.ID], partition)
- assigned = true
- }
- }
-
- memberIndex = (memberIndex + 1) % numMembers
-
- // Prevent infinite loop
- if memberIndex == startIndex && !assigned {
- // Force assign to any subscribed member
- for _, member := range sortedMembers {
- subscribed := false
- for _, topic := range member.Subscription {
- if topic == partition.Topic {
- subscribed = true
- break
- }
- }
- if subscribed {
- assignments[member.ID] = append(assignments[member.ID], partition)
- assigned = true
- break
- }
- }
- break
- }
- }
- }
-
- return assignments
-}
-
// GetAssignmentStrategy returns the appropriate assignment strategy
func GetAssignmentStrategy(name string) AssignmentStrategy {
switch name {
- case "range":
+ case ProtocolNameRange:
return &RangeAssignmentStrategy{}
- case "roundrobin":
+ case ProtocolNameRoundRobin:
return &RoundRobinAssignmentStrategy{}
- case "cooperative-sticky":
- return &CooperativeStickyAssignmentStrategy{}
- case "incremental-cooperative":
+ case ProtocolNameCooperativeSticky:
return NewIncrementalCooperativeAssignmentStrategy()
default:
// Default to range strategy