aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/consumer/incremental_rebalancing_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/consumer/incremental_rebalancing_test.go')
-rw-r--r--weed/mq/kafka/consumer/incremental_rebalancing_test.go399
1 files changed, 399 insertions, 0 deletions
diff --git a/weed/mq/kafka/consumer/incremental_rebalancing_test.go b/weed/mq/kafka/consumer/incremental_rebalancing_test.go
new file mode 100644
index 000000000..1352b2da0
--- /dev/null
+++ b/weed/mq/kafka/consumer/incremental_rebalancing_test.go
@@ -0,0 +1,399 @@
+package consumer
+
+import (
+ "fmt"
+ "testing"
+ "time"
+)
+
+func TestIncrementalCooperativeAssignmentStrategy_BasicAssignment(t *testing.T) {
+ strategy := NewIncrementalCooperativeAssignmentStrategy()
+
+ // Create members
+ members := []*GroupMember{
+ {
+ ID: "member-1",
+ Subscription: []string{"topic-1"},
+ Assignment: []PartitionAssignment{}, // No existing assignment
+ },
+ {
+ ID: "member-2",
+ Subscription: []string{"topic-1"},
+ Assignment: []PartitionAssignment{}, // No existing assignment
+ },
+ }
+
+ // Topic partitions
+ topicPartitions := map[string][]int32{
+ "topic-1": {0, 1, 2, 3},
+ }
+
+ // First assignment (no existing assignments, should be direct)
+ assignments := strategy.Assign(members, topicPartitions)
+
+ // Verify assignments
+ if len(assignments) != 2 {
+ t.Errorf("Expected 2 member assignments, got %d", len(assignments))
+ }
+
+ totalPartitions := 0
+ for memberID, partitions := range assignments {
+ t.Logf("Member %s assigned %d partitions: %v", memberID, len(partitions), partitions)
+ totalPartitions += len(partitions)
+ }
+
+ if totalPartitions != 4 {
+ t.Errorf("Expected 4 total partitions assigned, got %d", totalPartitions)
+ }
+
+ // Should not be in rebalance state for initial assignment
+ if strategy.IsRebalanceInProgress() {
+ t.Error("Expected no rebalance in progress for initial assignment")
+ }
+}
+
+func TestIncrementalCooperativeAssignmentStrategy_RebalanceWithRevocation(t *testing.T) {
+ strategy := NewIncrementalCooperativeAssignmentStrategy()
+
+ // Create members with existing assignments
+ members := []*GroupMember{
+ {
+ ID: "member-1",
+ Subscription: []string{"topic-1"},
+ Assignment: []PartitionAssignment{
+ {Topic: "topic-1", Partition: 0},
+ {Topic: "topic-1", Partition: 1},
+ {Topic: "topic-1", Partition: 2},
+ {Topic: "topic-1", Partition: 3}, // This member has all partitions
+ },
+ },
+ {
+ ID: "member-2",
+ Subscription: []string{"topic-1"},
+ Assignment: []PartitionAssignment{}, // New member with no assignments
+ },
+ }
+
+ topicPartitions := map[string][]int32{
+ "topic-1": {0, 1, 2, 3},
+ }
+
+ // First call should start revocation phase
+ assignments1 := strategy.Assign(members, topicPartitions)
+
+ // Should be in revocation phase
+ if !strategy.IsRebalanceInProgress() {
+ t.Error("Expected rebalance to be in progress")
+ }
+
+ state := strategy.GetRebalanceState()
+ if state.Phase != RebalancePhaseRevocation {
+ t.Errorf("Expected revocation phase, got %s", state.Phase)
+ }
+
+ // Member-1 should have some partitions revoked
+ member1Assignments := assignments1["member-1"]
+ if len(member1Assignments) >= 4 {
+ t.Errorf("Expected member-1 to have fewer than 4 partitions after revocation, got %d", len(member1Assignments))
+ }
+
+ // Member-2 should still have no assignments during revocation
+ member2Assignments := assignments1["member-2"]
+ if len(member2Assignments) != 0 {
+ t.Errorf("Expected member-2 to have 0 partitions during revocation, got %d", len(member2Assignments))
+ }
+
+ t.Logf("Revocation phase - Member-1: %d partitions, Member-2: %d partitions",
+ len(member1Assignments), len(member2Assignments))
+
+ // Simulate time passing and second call (should move to assignment phase)
+ time.Sleep(10 * time.Millisecond)
+
+ // Force move to assignment phase by setting timeout to 0
+ state.RevocationTimeout = 0
+
+ assignments2 := strategy.Assign(members, topicPartitions)
+
+ // Should complete rebalance
+ if strategy.IsRebalanceInProgress() {
+ t.Error("Expected rebalance to be completed")
+ }
+
+ // Both members should have partitions now
+ member1FinalAssignments := assignments2["member-1"]
+ member2FinalAssignments := assignments2["member-2"]
+
+ if len(member1FinalAssignments) == 0 {
+ t.Error("Expected member-1 to have some partitions after rebalance")
+ }
+
+ if len(member2FinalAssignments) == 0 {
+ t.Error("Expected member-2 to have some partitions after rebalance")
+ }
+
+ totalFinalPartitions := len(member1FinalAssignments) + len(member2FinalAssignments)
+ if totalFinalPartitions != 4 {
+ t.Errorf("Expected 4 total partitions after rebalance, got %d", totalFinalPartitions)
+ }
+
+ t.Logf("Final assignment - Member-1: %d partitions, Member-2: %d partitions",
+ len(member1FinalAssignments), len(member2FinalAssignments))
+}
+
+func TestIncrementalCooperativeAssignmentStrategy_NoRevocationNeeded(t *testing.T) {
+ strategy := NewIncrementalCooperativeAssignmentStrategy()
+
+ // Create members with already balanced assignments
+ members := []*GroupMember{
+ {
+ ID: "member-1",
+ Subscription: []string{"topic-1"},
+ Assignment: []PartitionAssignment{
+ {Topic: "topic-1", Partition: 0},
+ {Topic: "topic-1", Partition: 1},
+ },
+ },
+ {
+ ID: "member-2",
+ Subscription: []string{"topic-1"},
+ Assignment: []PartitionAssignment{
+ {Topic: "topic-1", Partition: 2},
+ {Topic: "topic-1", Partition: 3},
+ },
+ },
+ }
+
+ topicPartitions := map[string][]int32{
+ "topic-1": {0, 1, 2, 3},
+ }
+
+ // Assignment should not trigger rebalance
+ assignments := strategy.Assign(members, topicPartitions)
+
+ // Should not be in rebalance state
+ if strategy.IsRebalanceInProgress() {
+ t.Error("Expected no rebalance in progress when assignments are already balanced")
+ }
+
+ // Assignments should remain the same
+ member1Assignments := assignments["member-1"]
+ member2Assignments := assignments["member-2"]
+
+ if len(member1Assignments) != 2 {
+ t.Errorf("Expected member-1 to keep 2 partitions, got %d", len(member1Assignments))
+ }
+
+ if len(member2Assignments) != 2 {
+ t.Errorf("Expected member-2 to keep 2 partitions, got %d", len(member2Assignments))
+ }
+}
+
+func TestIncrementalCooperativeAssignmentStrategy_MultipleTopics(t *testing.T) {
+ strategy := NewIncrementalCooperativeAssignmentStrategy()
+
+ // Create members with mixed topic subscriptions
+ members := []*GroupMember{
+ {
+ ID: "member-1",
+ Subscription: []string{"topic-1", "topic-2"},
+ Assignment: []PartitionAssignment{
+ {Topic: "topic-1", Partition: 0},
+ {Topic: "topic-1", Partition: 1},
+ {Topic: "topic-2", Partition: 0},
+ },
+ },
+ {
+ ID: "member-2",
+ Subscription: []string{"topic-1"},
+ Assignment: []PartitionAssignment{
+ {Topic: "topic-1", Partition: 2},
+ },
+ },
+ {
+ ID: "member-3",
+ Subscription: []string{"topic-2"},
+ Assignment: []PartitionAssignment{}, // New member
+ },
+ }
+
+ topicPartitions := map[string][]int32{
+ "topic-1": {0, 1, 2},
+ "topic-2": {0, 1},
+ }
+
+ // Should trigger rebalance to distribute topic-2 partitions
+ assignments := strategy.Assign(members, topicPartitions)
+
+ // Verify all partitions are assigned
+ allAssignedPartitions := make(map[string]bool)
+ for _, memberAssignments := range assignments {
+ for _, assignment := range memberAssignments {
+ key := fmt.Sprintf("%s:%d", assignment.Topic, assignment.Partition)
+ allAssignedPartitions[key] = true
+ }
+ }
+
+ expectedPartitions := []string{"topic-1:0", "topic-1:1", "topic-1:2", "topic-2:0", "topic-2:1"}
+ for _, expected := range expectedPartitions {
+ if !allAssignedPartitions[expected] {
+ t.Errorf("Expected partition %s to be assigned", expected)
+ }
+ }
+
+ // Debug: Print all assigned partitions
+ t.Logf("All assigned partitions: %v", allAssignedPartitions)
+}
+
+func TestIncrementalCooperativeAssignmentStrategy_ForceComplete(t *testing.T) {
+ strategy := NewIncrementalCooperativeAssignmentStrategy()
+
+ // Start a rebalance - create scenario where member-1 has all partitions but member-2 joins
+ members := []*GroupMember{
+ {
+ ID: "member-1",
+ Subscription: []string{"topic-1"},
+ Assignment: []PartitionAssignment{
+ {Topic: "topic-1", Partition: 0},
+ {Topic: "topic-1", Partition: 1},
+ {Topic: "topic-1", Partition: 2},
+ {Topic: "topic-1", Partition: 3},
+ },
+ },
+ {
+ ID: "member-2",
+ Subscription: []string{"topic-1"},
+ Assignment: []PartitionAssignment{}, // New member
+ },
+ }
+
+ topicPartitions := map[string][]int32{
+ "topic-1": {0, 1, 2, 3},
+ }
+
+ // This should start a rebalance (member-2 needs partitions)
+ strategy.Assign(members, topicPartitions)
+
+ if !strategy.IsRebalanceInProgress() {
+ t.Error("Expected rebalance to be in progress")
+ }
+
+ // Force complete the rebalance
+ strategy.ForceCompleteRebalance()
+
+ if strategy.IsRebalanceInProgress() {
+ t.Error("Expected rebalance to be completed after force complete")
+ }
+
+ state := strategy.GetRebalanceState()
+ if state.Phase != RebalancePhaseNone {
+ t.Errorf("Expected phase to be None after force complete, got %s", state.Phase)
+ }
+}
+
+func TestIncrementalCooperativeAssignmentStrategy_RevocationTimeout(t *testing.T) {
+ strategy := NewIncrementalCooperativeAssignmentStrategy()
+
+ // Set a very short revocation timeout for testing
+ strategy.rebalanceState.RevocationTimeout = 1 * time.Millisecond
+
+ members := []*GroupMember{
+ {
+ ID: "member-1",
+ Subscription: []string{"topic-1"},
+ Assignment: []PartitionAssignment{
+ {Topic: "topic-1", Partition: 0},
+ {Topic: "topic-1", Partition: 1},
+ {Topic: "topic-1", Partition: 2},
+ {Topic: "topic-1", Partition: 3},
+ },
+ },
+ {
+ ID: "member-2",
+ Subscription: []string{"topic-1"},
+ Assignment: []PartitionAssignment{},
+ },
+ }
+
+ topicPartitions := map[string][]int32{
+ "topic-1": {0, 1, 2, 3},
+ }
+
+ // First call starts revocation
+ strategy.Assign(members, topicPartitions)
+
+ if !strategy.IsRebalanceInProgress() {
+ t.Error("Expected rebalance to be in progress")
+ }
+
+ // Wait for timeout
+ time.Sleep(5 * time.Millisecond)
+
+ // Second call should complete due to timeout
+ assignments := strategy.Assign(members, topicPartitions)
+
+ if strategy.IsRebalanceInProgress() {
+ t.Error("Expected rebalance to be completed after timeout")
+ }
+
+ // Both members should have partitions
+ member1Assignments := assignments["member-1"]
+ member2Assignments := assignments["member-2"]
+
+ if len(member1Assignments) == 0 {
+ t.Error("Expected member-1 to have partitions after timeout")
+ }
+
+ if len(member2Assignments) == 0 {
+ t.Error("Expected member-2 to have partitions after timeout")
+ }
+}
+
+func TestIncrementalCooperativeAssignmentStrategy_StateTransitions(t *testing.T) {
+ strategy := NewIncrementalCooperativeAssignmentStrategy()
+
+ // Initial state should be None
+ state := strategy.GetRebalanceState()
+ if state.Phase != RebalancePhaseNone {
+ t.Errorf("Expected initial phase to be None, got %s", state.Phase)
+ }
+
+ // Create scenario that requires rebalancing
+ members := []*GroupMember{
+ {
+ ID: "member-1",
+ Subscription: []string{"topic-1"},
+ Assignment: []PartitionAssignment{
+ {Topic: "topic-1", Partition: 0},
+ {Topic: "topic-1", Partition: 1},
+ {Topic: "topic-1", Partition: 2},
+ {Topic: "topic-1", Partition: 3},
+ },
+ },
+ {
+ ID: "member-2",
+ Subscription: []string{"topic-1"},
+ Assignment: []PartitionAssignment{}, // New member
+ },
+ }
+
+ topicPartitions := map[string][]int32{
+ "topic-1": {0, 1, 2, 3}, // Same partitions, but need rebalancing due to new member
+ }
+
+ // First call should move to revocation phase
+ strategy.Assign(members, topicPartitions)
+ state = strategy.GetRebalanceState()
+ if state.Phase != RebalancePhaseRevocation {
+ t.Errorf("Expected phase to be Revocation, got %s", state.Phase)
+ }
+
+ // Force timeout to move to assignment phase
+ state.RevocationTimeout = 0
+ strategy.Assign(members, topicPartitions)
+
+ // Should complete and return to None
+ state = strategy.GetRebalanceState()
+ if state.Phase != RebalancePhaseNone {
+ t.Errorf("Expected phase to be None after completion, got %s", state.Phase)
+ }
+}