diff options
Diffstat (limited to 'weed/mq/kafka/consumer/assignment_test.go')
| -rw-r--r-- | weed/mq/kafka/consumer/assignment_test.go | 128 |
1 files changed, 64 insertions, 64 deletions
diff --git a/weed/mq/kafka/consumer/assignment_test.go b/weed/mq/kafka/consumer/assignment_test.go index 520200ed3..14200366f 100644 --- a/weed/mq/kafka/consumer/assignment_test.go +++ b/weed/mq/kafka/consumer/assignment_test.go @@ -8,11 +8,11 @@ import ( func TestRangeAssignmentStrategy(t *testing.T) { strategy := &RangeAssignmentStrategy{} - - if strategy.Name() != "range" { - t.Errorf("Expected strategy name 'range', got '%s'", strategy.Name()) + + if strategy.Name() != ProtocolNameRange { + t.Errorf("Expected strategy name '%s', got '%s'", ProtocolNameRange, strategy.Name()) } - + // Test with 2 members, 4 partitions on one topic members := []*GroupMember{ { @@ -20,38 +20,38 @@ func TestRangeAssignmentStrategy(t *testing.T) { Subscription: []string{"topic1"}, }, { - ID: "member2", + ID: "member2", Subscription: []string{"topic1"}, }, } - + topicPartitions := map[string][]int32{ "topic1": {0, 1, 2, 3}, } - + assignments := strategy.Assign(members, topicPartitions) - + // Verify all members have assignments if len(assignments) != 2 { t.Fatalf("Expected assignments for 2 members, got %d", len(assignments)) } - + // Verify total partitions assigned totalAssigned := 0 for _, assignment := range assignments { totalAssigned += len(assignment) } - + if totalAssigned != 4 { t.Errorf("Expected 4 total partitions assigned, got %d", totalAssigned) } - + // Range assignment should distribute evenly: 2 partitions each for memberID, assignment := range assignments { if len(assignment) != 2 { t.Errorf("Expected 2 partitions for member %s, got %d", memberID, len(assignment)) } - + // Verify all assignments are for the subscribed topic for _, pa := range assignment { if pa.Topic != "topic1" { @@ -63,27 +63,27 @@ func TestRangeAssignmentStrategy(t *testing.T) { func TestRangeAssignmentStrategy_UnevenPartitions(t *testing.T) { strategy := &RangeAssignmentStrategy{} - + // Test with 3 members, 4 partitions - should distribute 2,1,1 members := []*GroupMember{ {ID: "member1", Subscription: []string{"topic1"}}, {ID: "member2", Subscription: []string{"topic1"}}, {ID: "member3", Subscription: []string{"topic1"}}, } - + topicPartitions := map[string][]int32{ "topic1": {0, 1, 2, 3}, } - + assignments := strategy.Assign(members, topicPartitions) - + // Get assignment counts counts := make([]int, 0, 3) for _, assignment := range assignments { counts = append(counts, len(assignment)) } sort.Ints(counts) - + // Should be distributed as [1, 1, 2] (first member gets extra partition) expected := []int{1, 1, 2} if !reflect.DeepEqual(counts, expected) { @@ -93,30 +93,30 @@ func TestRangeAssignmentStrategy_UnevenPartitions(t *testing.T) { func TestRangeAssignmentStrategy_MultipleTopics(t *testing.T) { strategy := &RangeAssignmentStrategy{} - + members := []*GroupMember{ {ID: "member1", Subscription: []string{"topic1", "topic2"}}, {ID: "member2", Subscription: []string{"topic1"}}, } - + topicPartitions := map[string][]int32{ "topic1": {0, 1}, "topic2": {0, 1}, } - + assignments := strategy.Assign(members, topicPartitions) - + // Member1 should get assignments from both topics member1Assignments := assignments["member1"] topicsAssigned := make(map[string]int) for _, pa := range member1Assignments { topicsAssigned[pa.Topic]++ } - + if len(topicsAssigned) != 2 { t.Errorf("Expected member1 to be assigned to 2 topics, got %d", len(topicsAssigned)) } - + // Member2 should only get topic1 assignments member2Assignments := assignments["member2"] for _, pa := range member2Assignments { @@ -128,38 +128,38 @@ func TestRangeAssignmentStrategy_MultipleTopics(t *testing.T) { func TestRoundRobinAssignmentStrategy(t *testing.T) { strategy := &RoundRobinAssignmentStrategy{} - - if strategy.Name() != "roundrobin" { - t.Errorf("Expected strategy name 'roundrobin', got '%s'", strategy.Name()) + + if strategy.Name() != ProtocolNameRoundRobin { + t.Errorf("Expected strategy name '%s', got '%s'", ProtocolNameRoundRobin, strategy.Name()) } - + // Test with 2 members, 4 partitions on one topic members := []*GroupMember{ {ID: "member1", Subscription: []string{"topic1"}}, {ID: "member2", Subscription: []string{"topic1"}}, } - + topicPartitions := map[string][]int32{ "topic1": {0, 1, 2, 3}, } - + assignments := strategy.Assign(members, topicPartitions) - + // Verify all members have assignments if len(assignments) != 2 { t.Fatalf("Expected assignments for 2 members, got %d", len(assignments)) } - + // Verify total partitions assigned totalAssigned := 0 for _, assignment := range assignments { totalAssigned += len(assignment) } - + if totalAssigned != 4 { t.Errorf("Expected 4 total partitions assigned, got %d", totalAssigned) } - + // Round robin should distribute evenly: 2 partitions each for memberID, assignment := range assignments { if len(assignment) != 2 { @@ -170,26 +170,26 @@ func TestRoundRobinAssignmentStrategy(t *testing.T) { func TestRoundRobinAssignmentStrategy_MultipleTopics(t *testing.T) { strategy := &RoundRobinAssignmentStrategy{} - + members := []*GroupMember{ {ID: "member1", Subscription: []string{"topic1", "topic2"}}, {ID: "member2", Subscription: []string{"topic1", "topic2"}}, } - + topicPartitions := map[string][]int32{ "topic1": {0, 1}, "topic2": {0, 1}, } - + assignments := strategy.Assign(members, topicPartitions) - + // Each member should get 2 partitions (round robin across topics) for memberID, assignment := range assignments { if len(assignment) != 2 { t.Errorf("Expected 2 partitions for member %s, got %d", memberID, len(assignment)) } } - + // Verify no partition is assigned twice assignedPartitions := make(map[string]map[int32]bool) for _, assignment := range assignments { @@ -206,19 +206,19 @@ func TestRoundRobinAssignmentStrategy_MultipleTopics(t *testing.T) { } func TestGetAssignmentStrategy(t *testing.T) { - rangeStrategy := GetAssignmentStrategy("range") - if rangeStrategy.Name() != "range" { + rangeStrategy := GetAssignmentStrategy(ProtocolNameRange) + if rangeStrategy.Name() != ProtocolNameRange { t.Errorf("Expected range strategy, got %s", rangeStrategy.Name()) } - - rrStrategy := GetAssignmentStrategy("roundrobin") - if rrStrategy.Name() != "roundrobin" { + + rrStrategy := GetAssignmentStrategy(ProtocolNameRoundRobin) + if rrStrategy.Name() != ProtocolNameRoundRobin { t.Errorf("Expected roundrobin strategy, got %s", rrStrategy.Name()) } - + // Unknown strategy should default to range defaultStrategy := GetAssignmentStrategy("unknown") - if defaultStrategy.Name() != "range" { + if defaultStrategy.Name() != ProtocolNameRange { t.Errorf("Expected default strategy to be range, got %s", defaultStrategy.Name()) } } @@ -226,7 +226,7 @@ func TestGetAssignmentStrategy(t *testing.T) { func TestConsumerGroup_AssignPartitions(t *testing.T) { group := &ConsumerGroup{ ID: "test-group", - Protocol: "range", + Protocol: ProtocolNameRange, Members: map[string]*GroupMember{ "member1": { ID: "member1", @@ -234,25 +234,25 @@ func TestConsumerGroup_AssignPartitions(t *testing.T) { State: MemberStateStable, }, "member2": { - ID: "member2", + ID: "member2", Subscription: []string{"topic1"}, State: MemberStateStable, }, }, } - + topicPartitions := map[string][]int32{ "topic1": {0, 1, 2, 3}, } - + group.AssignPartitions(topicPartitions) - + // Verify assignments were created for memberID, member := range group.Members { if len(member.Assignment) == 0 { t.Errorf("Expected member %s to have partition assignments", memberID) } - + // Verify all assignments are valid for _, pa := range member.Assignment { if pa.Topic != "topic1" { @@ -277,24 +277,24 @@ func TestConsumerGroup_GetMemberAssignments(t *testing.T) { }, }, } - + assignments := group.GetMemberAssignments() - + if len(assignments) != 1 { t.Fatalf("Expected 1 member assignment, got %d", len(assignments)) } - + member1Assignments := assignments["member1"] if len(member1Assignments) != 2 { t.Errorf("Expected 2 partition assignments for member1, got %d", len(member1Assignments)) } - + // Verify assignment content expectedAssignments := []PartitionAssignment{ {Topic: "topic1", Partition: 0}, {Topic: "topic1", Partition: 1}, } - + if !reflect.DeepEqual(member1Assignments, expectedAssignments) { t.Errorf("Expected assignments %v, got %v", expectedAssignments, member1Assignments) } @@ -317,21 +317,21 @@ func TestConsumerGroup_UpdateMemberSubscription(t *testing.T) { "topic2": true, }, } - + // Update member1's subscription group.UpdateMemberSubscription("member1", []string{"topic1", "topic3"}) - + // Verify member subscription updated member1 := group.Members["member1"] expectedSubscription := []string{"topic1", "topic3"} if !reflect.DeepEqual(member1.Subscription, expectedSubscription) { t.Errorf("Expected subscription %v, got %v", expectedSubscription, member1.Subscription) } - + // Verify group subscribed topics updated expectedGroupTopics := []string{"topic1", "topic2", "topic3"} actualGroupTopics := group.GetSubscribedTopics() - + if !reflect.DeepEqual(actualGroupTopics, expectedGroupTopics) { t.Errorf("Expected group topics %v, got %v", expectedGroupTopics, actualGroupTopics) } @@ -340,19 +340,19 @@ func TestConsumerGroup_UpdateMemberSubscription(t *testing.T) { func TestAssignmentStrategy_EmptyMembers(t *testing.T) { rangeStrategy := &RangeAssignmentStrategy{} rrStrategy := &RoundRobinAssignmentStrategy{} - + topicPartitions := map[string][]int32{ "topic1": {0, 1, 2, 3}, } - + // Both strategies should handle empty members gracefully rangeAssignments := rangeStrategy.Assign([]*GroupMember{}, topicPartitions) rrAssignments := rrStrategy.Assign([]*GroupMember{}, topicPartitions) - + if len(rangeAssignments) != 0 { t.Error("Expected empty assignments for empty members list (range)") } - + if len(rrAssignments) != 0 { t.Error("Expected empty assignments for empty members list (round robin)") } |
