diff options
Diffstat (limited to 'weed/mq/kafka/consumer/incremental_rebalancing.go')
| -rw-r--r-- | weed/mq/kafka/consumer/incremental_rebalancing.go | 357 |
1 files changed, 357 insertions, 0 deletions
diff --git a/weed/mq/kafka/consumer/incremental_rebalancing.go b/weed/mq/kafka/consumer/incremental_rebalancing.go new file mode 100644 index 000000000..10c794375 --- /dev/null +++ b/weed/mq/kafka/consumer/incremental_rebalancing.go @@ -0,0 +1,357 @@ +package consumer + +import ( + "fmt" + "sort" + "time" +) + +// RebalancePhase represents the phase of incremental cooperative rebalancing +type RebalancePhase int + +const ( + RebalancePhaseNone RebalancePhase = iota + RebalancePhaseRevocation + RebalancePhaseAssignment +) + +func (rp RebalancePhase) String() string { + switch rp { + case RebalancePhaseNone: + return "None" + case RebalancePhaseRevocation: + return "Revocation" + case RebalancePhaseAssignment: + return "Assignment" + default: + return "Unknown" + } +} + +// IncrementalRebalanceState tracks the state of incremental cooperative rebalancing +type IncrementalRebalanceState struct { + Phase RebalancePhase + RevocationGeneration int32 // Generation when revocation started + AssignmentGeneration int32 // Generation when assignment started + RevokedPartitions map[string][]PartitionAssignment // Member ID -> revoked partitions + PendingAssignments map[string][]PartitionAssignment // Member ID -> pending assignments + StartTime time.Time + RevocationTimeout time.Duration +} + +// NewIncrementalRebalanceState creates a new incremental rebalance state +func NewIncrementalRebalanceState() *IncrementalRebalanceState { + return &IncrementalRebalanceState{ + Phase: RebalancePhaseNone, + RevokedPartitions: make(map[string][]PartitionAssignment), + PendingAssignments: make(map[string][]PartitionAssignment), + RevocationTimeout: 30 * time.Second, // Default revocation timeout + } +} + +// IncrementalCooperativeAssignmentStrategy implements incremental cooperative rebalancing +// This strategy performs rebalancing in two phases: +// 1. Revocation phase: Members give up partitions that need to be reassigned +// 2. Assignment phase: Members receive new partitions +type IncrementalCooperativeAssignmentStrategy struct { + rebalanceState *IncrementalRebalanceState +} + +func NewIncrementalCooperativeAssignmentStrategy() *IncrementalCooperativeAssignmentStrategy { + return &IncrementalCooperativeAssignmentStrategy{ + rebalanceState: NewIncrementalRebalanceState(), + } +} + +func (ics *IncrementalCooperativeAssignmentStrategy) Name() string { + return "cooperative-sticky" +} + +func (ics *IncrementalCooperativeAssignmentStrategy) Assign( + members []*GroupMember, + topicPartitions map[string][]int32, +) map[string][]PartitionAssignment { + if len(members) == 0 { + return make(map[string][]PartitionAssignment) + } + + // Check if we need to start a new rebalance + if ics.rebalanceState.Phase == RebalancePhaseNone { + return ics.startIncrementalRebalance(members, topicPartitions) + } + + // Continue existing rebalance based on current phase + switch ics.rebalanceState.Phase { + case RebalancePhaseRevocation: + return ics.handleRevocationPhase(members, topicPartitions) + case RebalancePhaseAssignment: + return ics.handleAssignmentPhase(members, topicPartitions) + default: + // Fallback to regular assignment + return ics.performRegularAssignment(members, topicPartitions) + } +} + +// startIncrementalRebalance initiates a new incremental rebalance +func (ics *IncrementalCooperativeAssignmentStrategy) startIncrementalRebalance( + members []*GroupMember, + topicPartitions map[string][]int32, +) map[string][]PartitionAssignment { + // Calculate ideal assignment + idealAssignment := ics.calculateIdealAssignment(members, topicPartitions) + + // Determine which partitions need to be revoked + partitionsToRevoke := ics.calculateRevocations(members, idealAssignment) + + if len(partitionsToRevoke) == 0 { + // No revocations needed, proceed with regular assignment + return idealAssignment + } + + // Start revocation phase + ics.rebalanceState.Phase = RebalancePhaseRevocation + ics.rebalanceState.StartTime = time.Now() + ics.rebalanceState.RevokedPartitions = partitionsToRevoke + + // Return current assignments minus revoked partitions + return ics.applyRevocations(members, partitionsToRevoke) +} + +// handleRevocationPhase manages the revocation phase of incremental rebalancing +func (ics *IncrementalCooperativeAssignmentStrategy) handleRevocationPhase( + members []*GroupMember, + topicPartitions map[string][]int32, +) map[string][]PartitionAssignment { + // Check if revocation timeout has passed + if time.Since(ics.rebalanceState.StartTime) > ics.rebalanceState.RevocationTimeout { + // Force move to assignment phase + ics.rebalanceState.Phase = RebalancePhaseAssignment + return ics.handleAssignmentPhase(members, topicPartitions) + } + + // Continue with revoked assignments (members should stop consuming revoked partitions) + return ics.getCurrentAssignmentsWithRevocations(members) +} + +// handleAssignmentPhase manages the assignment phase of incremental rebalancing +func (ics *IncrementalCooperativeAssignmentStrategy) handleAssignmentPhase( + members []*GroupMember, + topicPartitions map[string][]int32, +) map[string][]PartitionAssignment { + // Calculate final assignment including previously revoked partitions + finalAssignment := ics.calculateIdealAssignment(members, topicPartitions) + + // Complete the rebalance + ics.rebalanceState.Phase = RebalancePhaseNone + ics.rebalanceState.RevokedPartitions = make(map[string][]PartitionAssignment) + ics.rebalanceState.PendingAssignments = make(map[string][]PartitionAssignment) + + return finalAssignment +} + +// calculateIdealAssignment computes the ideal partition assignment +func (ics *IncrementalCooperativeAssignmentStrategy) calculateIdealAssignment( + members []*GroupMember, + topicPartitions map[string][]int32, +) map[string][]PartitionAssignment { + assignments := make(map[string][]PartitionAssignment) + for _, member := range members { + assignments[member.ID] = make([]PartitionAssignment, 0) + } + + // Sort members for consistent assignment + sortedMembers := make([]*GroupMember, len(members)) + copy(sortedMembers, members) + sort.Slice(sortedMembers, func(i, j int) bool { + return sortedMembers[i].ID < sortedMembers[j].ID + }) + + // Get all subscribed topics + subscribedTopics := make(map[string]bool) + for _, member := range members { + for _, topic := range member.Subscription { + subscribedTopics[topic] = true + } + } + + // Collect all partitions that need assignment + allPartitions := make([]PartitionAssignment, 0) + for topic := range subscribedTopics { + partitions, exists := topicPartitions[topic] + if !exists { + continue + } + + for _, partition := range partitions { + allPartitions = append(allPartitions, PartitionAssignment{ + Topic: topic, + Partition: partition, + }) + } + } + + // Sort partitions for consistent assignment + sort.Slice(allPartitions, func(i, j int) bool { + if allPartitions[i].Topic != allPartitions[j].Topic { + return allPartitions[i].Topic < allPartitions[j].Topic + } + return allPartitions[i].Partition < allPartitions[j].Partition + }) + + // Distribute partitions based on subscriptions + if len(allPartitions) > 0 && len(sortedMembers) > 0 { + // Group partitions by topic + partitionsByTopic := make(map[string][]PartitionAssignment) + for _, partition := range allPartitions { + partitionsByTopic[partition.Topic] = append(partitionsByTopic[partition.Topic], partition) + } + + // Assign partitions topic by topic + for topic, topicPartitions := range partitionsByTopic { + // Find members subscribed to this topic + subscribedMembers := make([]*GroupMember, 0) + for _, member := range sortedMembers { + for _, subscribedTopic := range member.Subscription { + if subscribedTopic == topic { + subscribedMembers = append(subscribedMembers, member) + break + } + } + } + + if len(subscribedMembers) == 0 { + continue // No members subscribed to this topic + } + + // Distribute topic partitions among subscribed members + partitionsPerMember := len(topicPartitions) / len(subscribedMembers) + extraPartitions := len(topicPartitions) % len(subscribedMembers) + + partitionIndex := 0 + for i, member := range subscribedMembers { + // Calculate how many partitions this member should get for this topic + numPartitions := partitionsPerMember + if i < extraPartitions { + numPartitions++ + } + + // Assign partitions to this member + for j := 0; j < numPartitions && partitionIndex < len(topicPartitions); j++ { + assignments[member.ID] = append(assignments[member.ID], topicPartitions[partitionIndex]) + partitionIndex++ + } + } + } + } + + return assignments +} + +// calculateRevocations determines which partitions need to be revoked for rebalancing +func (ics *IncrementalCooperativeAssignmentStrategy) calculateRevocations( + members []*GroupMember, + idealAssignment map[string][]PartitionAssignment, +) map[string][]PartitionAssignment { + revocations := make(map[string][]PartitionAssignment) + + for _, member := range members { + currentAssignment := member.Assignment + memberIdealAssignment := idealAssignment[member.ID] + + // Find partitions that are currently assigned but not in ideal assignment + currentMap := make(map[string]bool) + for _, assignment := range currentAssignment { + key := fmt.Sprintf("%s:%d", assignment.Topic, assignment.Partition) + currentMap[key] = true + } + + idealMap := make(map[string]bool) + for _, assignment := range memberIdealAssignment { + key := fmt.Sprintf("%s:%d", assignment.Topic, assignment.Partition) + idealMap[key] = true + } + + // Identify partitions to revoke + var toRevoke []PartitionAssignment + for _, assignment := range currentAssignment { + key := fmt.Sprintf("%s:%d", assignment.Topic, assignment.Partition) + if !idealMap[key] { + toRevoke = append(toRevoke, assignment) + } + } + + if len(toRevoke) > 0 { + revocations[member.ID] = toRevoke + } + } + + return revocations +} + +// applyRevocations returns current assignments with specified partitions revoked +func (ics *IncrementalCooperativeAssignmentStrategy) applyRevocations( + members []*GroupMember, + revocations map[string][]PartitionAssignment, +) map[string][]PartitionAssignment { + assignments := make(map[string][]PartitionAssignment) + + for _, member := range members { + assignments[member.ID] = make([]PartitionAssignment, 0) + + // Get revoked partitions for this member + revokedPartitions := make(map[string]bool) + if revoked, exists := revocations[member.ID]; exists { + for _, partition := range revoked { + key := fmt.Sprintf("%s:%d", partition.Topic, partition.Partition) + revokedPartitions[key] = true + } + } + + // Add current assignments except revoked ones + for _, assignment := range member.Assignment { + key := fmt.Sprintf("%s:%d", assignment.Topic, assignment.Partition) + if !revokedPartitions[key] { + assignments[member.ID] = append(assignments[member.ID], assignment) + } + } + } + + return assignments +} + +// getCurrentAssignmentsWithRevocations returns current assignments with revocations applied +func (ics *IncrementalCooperativeAssignmentStrategy) getCurrentAssignmentsWithRevocations( + members []*GroupMember, +) map[string][]PartitionAssignment { + return ics.applyRevocations(members, ics.rebalanceState.RevokedPartitions) +} + +// performRegularAssignment performs a regular (non-incremental) assignment as fallback +func (ics *IncrementalCooperativeAssignmentStrategy) performRegularAssignment( + members []*GroupMember, + topicPartitions map[string][]int32, +) map[string][]PartitionAssignment { + // Reset rebalance state + ics.rebalanceState = NewIncrementalRebalanceState() + + // Use regular cooperative-sticky logic + cooperativeSticky := &CooperativeStickyAssignmentStrategy{} + return cooperativeSticky.Assign(members, topicPartitions) +} + +// GetRebalanceState returns the current rebalance state (for monitoring/debugging) +func (ics *IncrementalCooperativeAssignmentStrategy) GetRebalanceState() *IncrementalRebalanceState { + return ics.rebalanceState +} + +// IsRebalanceInProgress returns true if an incremental rebalance is currently in progress +func (ics *IncrementalCooperativeAssignmentStrategy) IsRebalanceInProgress() bool { + return ics.rebalanceState.Phase != RebalancePhaseNone +} + +// ForceCompleteRebalance forces completion of the current rebalance (for timeout scenarios) +func (ics *IncrementalCooperativeAssignmentStrategy) ForceCompleteRebalance() { + ics.rebalanceState.Phase = RebalancePhaseNone + ics.rebalanceState.RevokedPartitions = make(map[string][]PartitionAssignment) + ics.rebalanceState.PendingAssignments = make(map[string][]PartitionAssignment) +} |
