diff options
Diffstat (limited to 'weed/mq/kafka/consumer/cooperative_sticky_test.go')
| -rw-r--r-- | weed/mq/kafka/consumer/cooperative_sticky_test.go | 412 |
1 files changed, 412 insertions, 0 deletions
diff --git a/weed/mq/kafka/consumer/cooperative_sticky_test.go b/weed/mq/kafka/consumer/cooperative_sticky_test.go new file mode 100644 index 000000000..373ff67ec --- /dev/null +++ b/weed/mq/kafka/consumer/cooperative_sticky_test.go @@ -0,0 +1,412 @@ +package consumer + +import ( + "testing" +) + +func TestCooperativeStickyAssignmentStrategy_Name(t *testing.T) { + strategy := &CooperativeStickyAssignmentStrategy{} + if strategy.Name() != "cooperative-sticky" { + t.Errorf("Expected strategy name 'cooperative-sticky', got '%s'", strategy.Name()) + } +} + +func TestCooperativeStickyAssignmentStrategy_InitialAssignment(t *testing.T) { + strategy := &CooperativeStickyAssignmentStrategy{} + + members := []*GroupMember{ + {ID: "member1", Subscription: []string{"topic1"}, Assignment: []PartitionAssignment{}}, + {ID: "member2", Subscription: []string{"topic1"}, Assignment: []PartitionAssignment{}}, + } + + topicPartitions := map[string][]int32{ + "topic1": {0, 1, 2, 3}, + } + + assignments := strategy.Assign(members, topicPartitions) + + // Verify all partitions are assigned + totalAssigned := 0 + for _, assignment := range assignments { + totalAssigned += len(assignment) + } + + if totalAssigned != 4 { + t.Errorf("Expected 4 total partitions assigned, got %d", totalAssigned) + } + + // Verify fair distribution (2 partitions each) + for memberID, assignment := range assignments { + if len(assignment) != 2 { + t.Errorf("Expected member %s to get 2 partitions, got %d", memberID, len(assignment)) + } + } + + // Verify no partition is assigned twice + assignedPartitions := make(map[PartitionAssignment]bool) + for _, assignment := range assignments { + for _, pa := range assignment { + if assignedPartitions[pa] { + t.Errorf("Partition %v assigned multiple times", pa) + } + assignedPartitions[pa] = true + } + } +} + +func TestCooperativeStickyAssignmentStrategy_StickyBehavior(t *testing.T) { + strategy := &CooperativeStickyAssignmentStrategy{} + + // Initial state: member1 has partitions 0,1 and member2 has partitions 2,3 + members := []*GroupMember{ + { + ID: "member1", + Subscription: []string{"topic1"}, + Assignment: []PartitionAssignment{ + {Topic: "topic1", Partition: 0}, + {Topic: "topic1", Partition: 1}, + }, + }, + { + ID: "member2", + Subscription: []string{"topic1"}, + Assignment: []PartitionAssignment{ + {Topic: "topic1", Partition: 2}, + {Topic: "topic1", Partition: 3}, + }, + }, + } + + topicPartitions := map[string][]int32{ + "topic1": {0, 1, 2, 3}, + } + + assignments := strategy.Assign(members, topicPartitions) + + // Verify sticky behavior - existing assignments should be preserved + member1Assignment := assignments["member1"] + member2Assignment := assignments["member2"] + + // Check that member1 still has partitions 0 and 1 + hasPartition0 := false + hasPartition1 := false + for _, pa := range member1Assignment { + if pa.Topic == "topic1" && pa.Partition == 0 { + hasPartition0 = true + } + if pa.Topic == "topic1" && pa.Partition == 1 { + hasPartition1 = true + } + } + + if !hasPartition0 || !hasPartition1 { + t.Errorf("Member1 should retain partitions 0 and 1, got %v", member1Assignment) + } + + // Check that member2 still has partitions 2 and 3 + hasPartition2 := false + hasPartition3 := false + for _, pa := range member2Assignment { + if pa.Topic == "topic1" && pa.Partition == 2 { + hasPartition2 = true + } + if pa.Topic == "topic1" && pa.Partition == 3 { + hasPartition3 = true + } + } + + if !hasPartition2 || !hasPartition3 { + t.Errorf("Member2 should retain partitions 2 and 3, got %v", member2Assignment) + } +} + +func TestCooperativeStickyAssignmentStrategy_NewMemberJoin(t *testing.T) { + strategy := &CooperativeStickyAssignmentStrategy{} + + // Scenario: member1 has all partitions, member2 joins + members := []*GroupMember{ + { + ID: "member1", + Subscription: []string{"topic1"}, + Assignment: []PartitionAssignment{ + {Topic: "topic1", Partition: 0}, + {Topic: "topic1", Partition: 1}, + {Topic: "topic1", Partition: 2}, + {Topic: "topic1", Partition: 3}, + }, + }, + { + ID: "member2", + Subscription: []string{"topic1"}, + Assignment: []PartitionAssignment{}, // New member, no existing assignment + }, + } + + topicPartitions := map[string][]int32{ + "topic1": {0, 1, 2, 3}, + } + + assignments := strategy.Assign(members, topicPartitions) + + // Verify fair redistribution (2 partitions each) + member1Assignment := assignments["member1"] + member2Assignment := assignments["member2"] + + if len(member1Assignment) != 2 { + t.Errorf("Expected member1 to have 2 partitions after rebalance, got %d", len(member1Assignment)) + } + + if len(member2Assignment) != 2 { + t.Errorf("Expected member2 to have 2 partitions after rebalance, got %d", len(member2Assignment)) + } + + // Verify some stickiness - member1 should retain some of its original partitions + originalPartitions := map[int32]bool{0: true, 1: true, 2: true, 3: true} + retainedCount := 0 + for _, pa := range member1Assignment { + if originalPartitions[pa.Partition] { + retainedCount++ + } + } + + if retainedCount == 0 { + t.Error("Member1 should retain at least some of its original partitions (sticky behavior)") + } + + t.Logf("Member1 retained %d out of 4 original partitions", retainedCount) +} + +func TestCooperativeStickyAssignmentStrategy_MemberLeave(t *testing.T) { + strategy := &CooperativeStickyAssignmentStrategy{} + + // Scenario: member2 leaves, member1 should get its partitions + members := []*GroupMember{ + { + ID: "member1", + Subscription: []string{"topic1"}, + Assignment: []PartitionAssignment{ + {Topic: "topic1", Partition: 0}, + {Topic: "topic1", Partition: 1}, + }, + }, + // member2 has left, so it's not in the members list + } + + topicPartitions := map[string][]int32{ + "topic1": {0, 1, 2, 3}, // All partitions still need to be assigned + } + + assignments := strategy.Assign(members, topicPartitions) + + // member1 should get all partitions + member1Assignment := assignments["member1"] + + if len(member1Assignment) != 4 { + t.Errorf("Expected member1 to get all 4 partitions after member2 left, got %d", len(member1Assignment)) + } + + // Verify member1 retained its original partitions (sticky behavior) + hasPartition0 := false + hasPartition1 := false + for _, pa := range member1Assignment { + if pa.Partition == 0 { + hasPartition0 = true + } + if pa.Partition == 1 { + hasPartition1 = true + } + } + + if !hasPartition0 || !hasPartition1 { + t.Error("Member1 should retain its original partitions 0 and 1") + } +} + +func TestCooperativeStickyAssignmentStrategy_MultipleTopics(t *testing.T) { + strategy := &CooperativeStickyAssignmentStrategy{} + + members := []*GroupMember{ + { + ID: "member1", + Subscription: []string{"topic1", "topic2"}, + Assignment: []PartitionAssignment{ + {Topic: "topic1", Partition: 0}, + {Topic: "topic2", Partition: 0}, + }, + }, + { + ID: "member2", + Subscription: []string{"topic1", "topic2"}, + Assignment: []PartitionAssignment{ + {Topic: "topic1", Partition: 1}, + {Topic: "topic2", Partition: 1}, + }, + }, + } + + topicPartitions := map[string][]int32{ + "topic1": {0, 1}, + "topic2": {0, 1}, + } + + assignments := strategy.Assign(members, topicPartitions) + + // Verify all partitions are assigned + totalAssigned := 0 + for _, assignment := range assignments { + totalAssigned += len(assignment) + } + + if totalAssigned != 4 { + t.Errorf("Expected 4 total partitions assigned across both topics, got %d", totalAssigned) + } + + // Verify sticky behavior - each member should retain their original assignments + member1Assignment := assignments["member1"] + member2Assignment := assignments["member2"] + + // Check member1 retains topic1:0 and topic2:0 + hasT1P0 := false + hasT2P0 := false + for _, pa := range member1Assignment { + if pa.Topic == "topic1" && pa.Partition == 0 { + hasT1P0 = true + } + if pa.Topic == "topic2" && pa.Partition == 0 { + hasT2P0 = true + } + } + + if !hasT1P0 || !hasT2P0 { + t.Errorf("Member1 should retain topic1:0 and topic2:0, got %v", member1Assignment) + } + + // Check member2 retains topic1:1 and topic2:1 + hasT1P1 := false + hasT2P1 := false + for _, pa := range member2Assignment { + if pa.Topic == "topic1" && pa.Partition == 1 { + hasT1P1 = true + } + if pa.Topic == "topic2" && pa.Partition == 1 { + hasT2P1 = true + } + } + + if !hasT1P1 || !hasT2P1 { + t.Errorf("Member2 should retain topic1:1 and topic2:1, got %v", member2Assignment) + } +} + +func TestCooperativeStickyAssignmentStrategy_UnevenPartitions(t *testing.T) { + strategy := &CooperativeStickyAssignmentStrategy{} + + // 5 partitions, 2 members - should distribute 3:2 or 2:3 + members := []*GroupMember{ + {ID: "member1", Subscription: []string{"topic1"}, Assignment: []PartitionAssignment{}}, + {ID: "member2", Subscription: []string{"topic1"}, Assignment: []PartitionAssignment{}}, + } + + topicPartitions := map[string][]int32{ + "topic1": {0, 1, 2, 3, 4}, + } + + assignments := strategy.Assign(members, topicPartitions) + + // Verify all partitions are assigned + totalAssigned := 0 + for _, assignment := range assignments { + totalAssigned += len(assignment) + } + + if totalAssigned != 5 { + t.Errorf("Expected 5 total partitions assigned, got %d", totalAssigned) + } + + // Verify fair distribution + member1Count := len(assignments["member1"]) + member2Count := len(assignments["member2"]) + + // Should be 3:2 or 2:3 distribution + if !((member1Count == 3 && member2Count == 2) || (member1Count == 2 && member2Count == 3)) { + t.Errorf("Expected 3:2 or 2:3 distribution, got %d:%d", member1Count, member2Count) + } +} + +func TestCooperativeStickyAssignmentStrategy_PartialSubscription(t *testing.T) { + strategy := &CooperativeStickyAssignmentStrategy{} + + // member1 subscribes to both topics, member2 only to topic1 + members := []*GroupMember{ + {ID: "member1", Subscription: []string{"topic1", "topic2"}, Assignment: []PartitionAssignment{}}, + {ID: "member2", Subscription: []string{"topic1"}, Assignment: []PartitionAssignment{}}, + } + + topicPartitions := map[string][]int32{ + "topic1": {0, 1}, + "topic2": {0, 1}, + } + + assignments := strategy.Assign(members, topicPartitions) + + // member1 should get all topic2 partitions since member2 isn't subscribed + member1Assignment := assignments["member1"] + member2Assignment := assignments["member2"] + + // Count topic2 partitions for each member + member1Topic2Count := 0 + member2Topic2Count := 0 + + for _, pa := range member1Assignment { + if pa.Topic == "topic2" { + member1Topic2Count++ + } + } + + for _, pa := range member2Assignment { + if pa.Topic == "topic2" { + member2Topic2Count++ + } + } + + if member1Topic2Count != 2 { + t.Errorf("Expected member1 to get all 2 topic2 partitions, got %d", member1Topic2Count) + } + + if member2Topic2Count != 0 { + t.Errorf("Expected member2 to get 0 topic2 partitions (not subscribed), got %d", member2Topic2Count) + } + + // Both members should get some topic1 partitions + member1Topic1Count := 0 + member2Topic1Count := 0 + + for _, pa := range member1Assignment { + if pa.Topic == "topic1" { + member1Topic1Count++ + } + } + + for _, pa := range member2Assignment { + if pa.Topic == "topic1" { + member2Topic1Count++ + } + } + + if member1Topic1Count + member2Topic1Count != 2 { + t.Errorf("Expected all topic1 partitions to be assigned, got %d + %d = %d", + member1Topic1Count, member2Topic1Count, member1Topic1Count + member2Topic1Count) + } +} + +func TestGetAssignmentStrategy_CooperativeSticky(t *testing.T) { + strategy := GetAssignmentStrategy("cooperative-sticky") + if strategy.Name() != "cooperative-sticky" { + t.Errorf("Expected cooperative-sticky strategy, got %s", strategy.Name()) + } + + // Verify it's the correct type + if _, ok := strategy.(*CooperativeStickyAssignmentStrategy); !ok { + t.Errorf("Expected CooperativeStickyAssignmentStrategy, got %T", strategy) + } +} |
