aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/consumer/cooperative_sticky_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/consumer/cooperative_sticky_test.go')
-rw-r--r--weed/mq/kafka/consumer/cooperative_sticky_test.go412
1 files changed, 412 insertions, 0 deletions
diff --git a/weed/mq/kafka/consumer/cooperative_sticky_test.go b/weed/mq/kafka/consumer/cooperative_sticky_test.go
new file mode 100644
index 000000000..373ff67ec
--- /dev/null
+++ b/weed/mq/kafka/consumer/cooperative_sticky_test.go
@@ -0,0 +1,412 @@
+package consumer
+
+import (
+ "testing"
+)
+
+func TestCooperativeStickyAssignmentStrategy_Name(t *testing.T) {
+ strategy := &CooperativeStickyAssignmentStrategy{}
+ if strategy.Name() != "cooperative-sticky" {
+ t.Errorf("Expected strategy name 'cooperative-sticky', got '%s'", strategy.Name())
+ }
+}
+
+func TestCooperativeStickyAssignmentStrategy_InitialAssignment(t *testing.T) {
+ strategy := &CooperativeStickyAssignmentStrategy{}
+
+ members := []*GroupMember{
+ {ID: "member1", Subscription: []string{"topic1"}, Assignment: []PartitionAssignment{}},
+ {ID: "member2", Subscription: []string{"topic1"}, Assignment: []PartitionAssignment{}},
+ }
+
+ topicPartitions := map[string][]int32{
+ "topic1": {0, 1, 2, 3},
+ }
+
+ assignments := strategy.Assign(members, topicPartitions)
+
+ // Verify all partitions are assigned
+ totalAssigned := 0
+ for _, assignment := range assignments {
+ totalAssigned += len(assignment)
+ }
+
+ if totalAssigned != 4 {
+ t.Errorf("Expected 4 total partitions assigned, got %d", totalAssigned)
+ }
+
+ // Verify fair distribution (2 partitions each)
+ for memberID, assignment := range assignments {
+ if len(assignment) != 2 {
+ t.Errorf("Expected member %s to get 2 partitions, got %d", memberID, len(assignment))
+ }
+ }
+
+ // Verify no partition is assigned twice
+ assignedPartitions := make(map[PartitionAssignment]bool)
+ for _, assignment := range assignments {
+ for _, pa := range assignment {
+ if assignedPartitions[pa] {
+ t.Errorf("Partition %v assigned multiple times", pa)
+ }
+ assignedPartitions[pa] = true
+ }
+ }
+}
+
+func TestCooperativeStickyAssignmentStrategy_StickyBehavior(t *testing.T) {
+ strategy := &CooperativeStickyAssignmentStrategy{}
+
+ // Initial state: member1 has partitions 0,1 and member2 has partitions 2,3
+ members := []*GroupMember{
+ {
+ ID: "member1",
+ Subscription: []string{"topic1"},
+ Assignment: []PartitionAssignment{
+ {Topic: "topic1", Partition: 0},
+ {Topic: "topic1", Partition: 1},
+ },
+ },
+ {
+ ID: "member2",
+ Subscription: []string{"topic1"},
+ Assignment: []PartitionAssignment{
+ {Topic: "topic1", Partition: 2},
+ {Topic: "topic1", Partition: 3},
+ },
+ },
+ }
+
+ topicPartitions := map[string][]int32{
+ "topic1": {0, 1, 2, 3},
+ }
+
+ assignments := strategy.Assign(members, topicPartitions)
+
+ // Verify sticky behavior - existing assignments should be preserved
+ member1Assignment := assignments["member1"]
+ member2Assignment := assignments["member2"]
+
+ // Check that member1 still has partitions 0 and 1
+ hasPartition0 := false
+ hasPartition1 := false
+ for _, pa := range member1Assignment {
+ if pa.Topic == "topic1" && pa.Partition == 0 {
+ hasPartition0 = true
+ }
+ if pa.Topic == "topic1" && pa.Partition == 1 {
+ hasPartition1 = true
+ }
+ }
+
+ if !hasPartition0 || !hasPartition1 {
+ t.Errorf("Member1 should retain partitions 0 and 1, got %v", member1Assignment)
+ }
+
+ // Check that member2 still has partitions 2 and 3
+ hasPartition2 := false
+ hasPartition3 := false
+ for _, pa := range member2Assignment {
+ if pa.Topic == "topic1" && pa.Partition == 2 {
+ hasPartition2 = true
+ }
+ if pa.Topic == "topic1" && pa.Partition == 3 {
+ hasPartition3 = true
+ }
+ }
+
+ if !hasPartition2 || !hasPartition3 {
+ t.Errorf("Member2 should retain partitions 2 and 3, got %v", member2Assignment)
+ }
+}
+
+func TestCooperativeStickyAssignmentStrategy_NewMemberJoin(t *testing.T) {
+ strategy := &CooperativeStickyAssignmentStrategy{}
+
+ // Scenario: member1 has all partitions, member2 joins
+ members := []*GroupMember{
+ {
+ ID: "member1",
+ Subscription: []string{"topic1"},
+ Assignment: []PartitionAssignment{
+ {Topic: "topic1", Partition: 0},
+ {Topic: "topic1", Partition: 1},
+ {Topic: "topic1", Partition: 2},
+ {Topic: "topic1", Partition: 3},
+ },
+ },
+ {
+ ID: "member2",
+ Subscription: []string{"topic1"},
+ Assignment: []PartitionAssignment{}, // New member, no existing assignment
+ },
+ }
+
+ topicPartitions := map[string][]int32{
+ "topic1": {0, 1, 2, 3},
+ }
+
+ assignments := strategy.Assign(members, topicPartitions)
+
+ // Verify fair redistribution (2 partitions each)
+ member1Assignment := assignments["member1"]
+ member2Assignment := assignments["member2"]
+
+ if len(member1Assignment) != 2 {
+ t.Errorf("Expected member1 to have 2 partitions after rebalance, got %d", len(member1Assignment))
+ }
+
+ if len(member2Assignment) != 2 {
+ t.Errorf("Expected member2 to have 2 partitions after rebalance, got %d", len(member2Assignment))
+ }
+
+ // Verify some stickiness - member1 should retain some of its original partitions
+ originalPartitions := map[int32]bool{0: true, 1: true, 2: true, 3: true}
+ retainedCount := 0
+ for _, pa := range member1Assignment {
+ if originalPartitions[pa.Partition] {
+ retainedCount++
+ }
+ }
+
+ if retainedCount == 0 {
+ t.Error("Member1 should retain at least some of its original partitions (sticky behavior)")
+ }
+
+ t.Logf("Member1 retained %d out of 4 original partitions", retainedCount)
+}
+
+func TestCooperativeStickyAssignmentStrategy_MemberLeave(t *testing.T) {
+ strategy := &CooperativeStickyAssignmentStrategy{}
+
+ // Scenario: member2 leaves, member1 should get its partitions
+ members := []*GroupMember{
+ {
+ ID: "member1",
+ Subscription: []string{"topic1"},
+ Assignment: []PartitionAssignment{
+ {Topic: "topic1", Partition: 0},
+ {Topic: "topic1", Partition: 1},
+ },
+ },
+ // member2 has left, so it's not in the members list
+ }
+
+ topicPartitions := map[string][]int32{
+ "topic1": {0, 1, 2, 3}, // All partitions still need to be assigned
+ }
+
+ assignments := strategy.Assign(members, topicPartitions)
+
+ // member1 should get all partitions
+ member1Assignment := assignments["member1"]
+
+ if len(member1Assignment) != 4 {
+ t.Errorf("Expected member1 to get all 4 partitions after member2 left, got %d", len(member1Assignment))
+ }
+
+ // Verify member1 retained its original partitions (sticky behavior)
+ hasPartition0 := false
+ hasPartition1 := false
+ for _, pa := range member1Assignment {
+ if pa.Partition == 0 {
+ hasPartition0 = true
+ }
+ if pa.Partition == 1 {
+ hasPartition1 = true
+ }
+ }
+
+ if !hasPartition0 || !hasPartition1 {
+ t.Error("Member1 should retain its original partitions 0 and 1")
+ }
+}
+
+func TestCooperativeStickyAssignmentStrategy_MultipleTopics(t *testing.T) {
+ strategy := &CooperativeStickyAssignmentStrategy{}
+
+ members := []*GroupMember{
+ {
+ ID: "member1",
+ Subscription: []string{"topic1", "topic2"},
+ Assignment: []PartitionAssignment{
+ {Topic: "topic1", Partition: 0},
+ {Topic: "topic2", Partition: 0},
+ },
+ },
+ {
+ ID: "member2",
+ Subscription: []string{"topic1", "topic2"},
+ Assignment: []PartitionAssignment{
+ {Topic: "topic1", Partition: 1},
+ {Topic: "topic2", Partition: 1},
+ },
+ },
+ }
+
+ topicPartitions := map[string][]int32{
+ "topic1": {0, 1},
+ "topic2": {0, 1},
+ }
+
+ assignments := strategy.Assign(members, topicPartitions)
+
+ // Verify all partitions are assigned
+ totalAssigned := 0
+ for _, assignment := range assignments {
+ totalAssigned += len(assignment)
+ }
+
+ if totalAssigned != 4 {
+ t.Errorf("Expected 4 total partitions assigned across both topics, got %d", totalAssigned)
+ }
+
+ // Verify sticky behavior - each member should retain their original assignments
+ member1Assignment := assignments["member1"]
+ member2Assignment := assignments["member2"]
+
+ // Check member1 retains topic1:0 and topic2:0
+ hasT1P0 := false
+ hasT2P0 := false
+ for _, pa := range member1Assignment {
+ if pa.Topic == "topic1" && pa.Partition == 0 {
+ hasT1P0 = true
+ }
+ if pa.Topic == "topic2" && pa.Partition == 0 {
+ hasT2P0 = true
+ }
+ }
+
+ if !hasT1P0 || !hasT2P0 {
+ t.Errorf("Member1 should retain topic1:0 and topic2:0, got %v", member1Assignment)
+ }
+
+ // Check member2 retains topic1:1 and topic2:1
+ hasT1P1 := false
+ hasT2P1 := false
+ for _, pa := range member2Assignment {
+ if pa.Topic == "topic1" && pa.Partition == 1 {
+ hasT1P1 = true
+ }
+ if pa.Topic == "topic2" && pa.Partition == 1 {
+ hasT2P1 = true
+ }
+ }
+
+ if !hasT1P1 || !hasT2P1 {
+ t.Errorf("Member2 should retain topic1:1 and topic2:1, got %v", member2Assignment)
+ }
+}
+
+func TestCooperativeStickyAssignmentStrategy_UnevenPartitions(t *testing.T) {
+ strategy := &CooperativeStickyAssignmentStrategy{}
+
+ // 5 partitions, 2 members - should distribute 3:2 or 2:3
+ members := []*GroupMember{
+ {ID: "member1", Subscription: []string{"topic1"}, Assignment: []PartitionAssignment{}},
+ {ID: "member2", Subscription: []string{"topic1"}, Assignment: []PartitionAssignment{}},
+ }
+
+ topicPartitions := map[string][]int32{
+ "topic1": {0, 1, 2, 3, 4},
+ }
+
+ assignments := strategy.Assign(members, topicPartitions)
+
+ // Verify all partitions are assigned
+ totalAssigned := 0
+ for _, assignment := range assignments {
+ totalAssigned += len(assignment)
+ }
+
+ if totalAssigned != 5 {
+ t.Errorf("Expected 5 total partitions assigned, got %d", totalAssigned)
+ }
+
+ // Verify fair distribution
+ member1Count := len(assignments["member1"])
+ member2Count := len(assignments["member2"])
+
+ // Should be 3:2 or 2:3 distribution
+ if !((member1Count == 3 && member2Count == 2) || (member1Count == 2 && member2Count == 3)) {
+ t.Errorf("Expected 3:2 or 2:3 distribution, got %d:%d", member1Count, member2Count)
+ }
+}
+
+func TestCooperativeStickyAssignmentStrategy_PartialSubscription(t *testing.T) {
+ strategy := &CooperativeStickyAssignmentStrategy{}
+
+ // member1 subscribes to both topics, member2 only to topic1
+ members := []*GroupMember{
+ {ID: "member1", Subscription: []string{"topic1", "topic2"}, Assignment: []PartitionAssignment{}},
+ {ID: "member2", Subscription: []string{"topic1"}, Assignment: []PartitionAssignment{}},
+ }
+
+ topicPartitions := map[string][]int32{
+ "topic1": {0, 1},
+ "topic2": {0, 1},
+ }
+
+ assignments := strategy.Assign(members, topicPartitions)
+
+ // member1 should get all topic2 partitions since member2 isn't subscribed
+ member1Assignment := assignments["member1"]
+ member2Assignment := assignments["member2"]
+
+ // Count topic2 partitions for each member
+ member1Topic2Count := 0
+ member2Topic2Count := 0
+
+ for _, pa := range member1Assignment {
+ if pa.Topic == "topic2" {
+ member1Topic2Count++
+ }
+ }
+
+ for _, pa := range member2Assignment {
+ if pa.Topic == "topic2" {
+ member2Topic2Count++
+ }
+ }
+
+ if member1Topic2Count != 2 {
+ t.Errorf("Expected member1 to get all 2 topic2 partitions, got %d", member1Topic2Count)
+ }
+
+ if member2Topic2Count != 0 {
+ t.Errorf("Expected member2 to get 0 topic2 partitions (not subscribed), got %d", member2Topic2Count)
+ }
+
+ // Both members should get some topic1 partitions
+ member1Topic1Count := 0
+ member2Topic1Count := 0
+
+ for _, pa := range member1Assignment {
+ if pa.Topic == "topic1" {
+ member1Topic1Count++
+ }
+ }
+
+ for _, pa := range member2Assignment {
+ if pa.Topic == "topic1" {
+ member2Topic1Count++
+ }
+ }
+
+ if member1Topic1Count + member2Topic1Count != 2 {
+ t.Errorf("Expected all topic1 partitions to be assigned, got %d + %d = %d",
+ member1Topic1Count, member2Topic1Count, member1Topic1Count + member2Topic1Count)
+ }
+}
+
+func TestGetAssignmentStrategy_CooperativeSticky(t *testing.T) {
+ strategy := GetAssignmentStrategy("cooperative-sticky")
+ if strategy.Name() != "cooperative-sticky" {
+ t.Errorf("Expected cooperative-sticky strategy, got %s", strategy.Name())
+ }
+
+ // Verify it's the correct type
+ if _, ok := strategy.(*CooperativeStickyAssignmentStrategy); !ok {
+ t.Errorf("Expected CooperativeStickyAssignmentStrategy, got %T", strategy)
+ }
+}