diff options
Diffstat (limited to 'weed/mq/kafka/consumer/incremental_rebalancing.go')
| -rw-r--r-- | weed/mq/kafka/consumer/incremental_rebalancing.go | 23 |
1 files changed, 11 insertions, 12 deletions
diff --git a/weed/mq/kafka/consumer/incremental_rebalancing.go b/weed/mq/kafka/consumer/incremental_rebalancing.go index 10c794375..49509bc76 100644 --- a/weed/mq/kafka/consumer/incremental_rebalancing.go +++ b/weed/mq/kafka/consumer/incremental_rebalancing.go @@ -31,8 +31,8 @@ func (rp RebalancePhase) String() string { // IncrementalRebalanceState tracks the state of incremental cooperative rebalancing type IncrementalRebalanceState struct { Phase RebalancePhase - RevocationGeneration int32 // Generation when revocation started - AssignmentGeneration int32 // Generation when assignment started + RevocationGeneration int32 // Generation when revocation started + AssignmentGeneration int32 // Generation when assignment started RevokedPartitions map[string][]PartitionAssignment // Member ID -> revoked partitions PendingAssignments map[string][]PartitionAssignment // Member ID -> pending assignments StartTime time.Time @@ -64,7 +64,7 @@ func NewIncrementalCooperativeAssignmentStrategy() *IncrementalCooperativeAssign } func (ics *IncrementalCooperativeAssignmentStrategy) Name() string { - return "cooperative-sticky" + return ProtocolNameCooperativeSticky } func (ics *IncrementalCooperativeAssignmentStrategy) Assign( @@ -99,10 +99,10 @@ func (ics *IncrementalCooperativeAssignmentStrategy) startIncrementalRebalance( ) map[string][]PartitionAssignment { // Calculate ideal assignment idealAssignment := ics.calculateIdealAssignment(members, topicPartitions) - + // Determine which partitions need to be revoked partitionsToRevoke := ics.calculateRevocations(members, idealAssignment) - + if len(partitionsToRevoke) == 0 { // No revocations needed, proceed with regular assignment return idealAssignment @@ -112,7 +112,7 @@ func (ics *IncrementalCooperativeAssignmentStrategy) startIncrementalRebalance( ics.rebalanceState.Phase = RebalancePhaseRevocation ics.rebalanceState.StartTime = time.Now() ics.rebalanceState.RevokedPartitions = partitionsToRevoke - + // Return current assignments minus revoked partitions return ics.applyRevocations(members, partitionsToRevoke) } @@ -140,12 +140,12 @@ func (ics *IncrementalCooperativeAssignmentStrategy) handleAssignmentPhase( ) map[string][]PartitionAssignment { // Calculate final assignment including previously revoked partitions finalAssignment := ics.calculateIdealAssignment(members, topicPartitions) - + // Complete the rebalance ics.rebalanceState.Phase = RebalancePhaseNone ics.rebalanceState.RevokedPartitions = make(map[string][]PartitionAssignment) ics.rebalanceState.PendingAssignments = make(map[string][]PartitionAssignment) - + return finalAssignment } @@ -333,10 +333,9 @@ func (ics *IncrementalCooperativeAssignmentStrategy) performRegularAssignment( ) map[string][]PartitionAssignment { // Reset rebalance state ics.rebalanceState = NewIncrementalRebalanceState() - - // Use regular cooperative-sticky logic - cooperativeSticky := &CooperativeStickyAssignmentStrategy{} - return cooperativeSticky.Assign(members, topicPartitions) + + // Use ideal assignment calculation (non-incremental cooperative assignment) + return ics.calculateIdealAssignment(members, topicPartitions) } // GetRebalanceState returns the current rebalance state (for monitoring/debugging) |
