aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/consumer/incremental_rebalancing.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/consumer/incremental_rebalancing.go')
-rw-r--r--weed/mq/kafka/consumer/incremental_rebalancing.go23
1 files changed, 11 insertions, 12 deletions
diff --git a/weed/mq/kafka/consumer/incremental_rebalancing.go b/weed/mq/kafka/consumer/incremental_rebalancing.go
index 10c794375..49509bc76 100644
--- a/weed/mq/kafka/consumer/incremental_rebalancing.go
+++ b/weed/mq/kafka/consumer/incremental_rebalancing.go
@@ -31,8 +31,8 @@ func (rp RebalancePhase) String() string {
// IncrementalRebalanceState tracks the state of incremental cooperative rebalancing
type IncrementalRebalanceState struct {
Phase RebalancePhase
- RevocationGeneration int32 // Generation when revocation started
- AssignmentGeneration int32 // Generation when assignment started
+ RevocationGeneration int32 // Generation when revocation started
+ AssignmentGeneration int32 // Generation when assignment started
RevokedPartitions map[string][]PartitionAssignment // Member ID -> revoked partitions
PendingAssignments map[string][]PartitionAssignment // Member ID -> pending assignments
StartTime time.Time
@@ -64,7 +64,7 @@ func NewIncrementalCooperativeAssignmentStrategy() *IncrementalCooperativeAssign
}
func (ics *IncrementalCooperativeAssignmentStrategy) Name() string {
- return "cooperative-sticky"
+ return ProtocolNameCooperativeSticky
}
func (ics *IncrementalCooperativeAssignmentStrategy) Assign(
@@ -99,10 +99,10 @@ func (ics *IncrementalCooperativeAssignmentStrategy) startIncrementalRebalance(
) map[string][]PartitionAssignment {
// Calculate ideal assignment
idealAssignment := ics.calculateIdealAssignment(members, topicPartitions)
-
+
// Determine which partitions need to be revoked
partitionsToRevoke := ics.calculateRevocations(members, idealAssignment)
-
+
if len(partitionsToRevoke) == 0 {
// No revocations needed, proceed with regular assignment
return idealAssignment
@@ -112,7 +112,7 @@ func (ics *IncrementalCooperativeAssignmentStrategy) startIncrementalRebalance(
ics.rebalanceState.Phase = RebalancePhaseRevocation
ics.rebalanceState.StartTime = time.Now()
ics.rebalanceState.RevokedPartitions = partitionsToRevoke
-
+
// Return current assignments minus revoked partitions
return ics.applyRevocations(members, partitionsToRevoke)
}
@@ -140,12 +140,12 @@ func (ics *IncrementalCooperativeAssignmentStrategy) handleAssignmentPhase(
) map[string][]PartitionAssignment {
// Calculate final assignment including previously revoked partitions
finalAssignment := ics.calculateIdealAssignment(members, topicPartitions)
-
+
// Complete the rebalance
ics.rebalanceState.Phase = RebalancePhaseNone
ics.rebalanceState.RevokedPartitions = make(map[string][]PartitionAssignment)
ics.rebalanceState.PendingAssignments = make(map[string][]PartitionAssignment)
-
+
return finalAssignment
}
@@ -333,10 +333,9 @@ func (ics *IncrementalCooperativeAssignmentStrategy) performRegularAssignment(
) map[string][]PartitionAssignment {
// Reset rebalance state
ics.rebalanceState = NewIncrementalRebalanceState()
-
- // Use regular cooperative-sticky logic
- cooperativeSticky := &CooperativeStickyAssignmentStrategy{}
- return cooperativeSticky.Assign(members, topicPartitions)
+
+ // Use ideal assignment calculation (non-incremental cooperative assignment)
+ return ics.calculateIdealAssignment(members, topicPartitions)
}
// GetRebalanceState returns the current rebalance state (for monitoring/debugging)