aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/consumer/assignment_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/consumer/assignment_test.go')
-rw-r--r--weed/mq/kafka/consumer/assignment_test.go128
1 files changed, 64 insertions, 64 deletions
diff --git a/weed/mq/kafka/consumer/assignment_test.go b/weed/mq/kafka/consumer/assignment_test.go
index 520200ed3..14200366f 100644
--- a/weed/mq/kafka/consumer/assignment_test.go
+++ b/weed/mq/kafka/consumer/assignment_test.go
@@ -8,11 +8,11 @@ import (
func TestRangeAssignmentStrategy(t *testing.T) {
strategy := &RangeAssignmentStrategy{}
-
- if strategy.Name() != "range" {
- t.Errorf("Expected strategy name 'range', got '%s'", strategy.Name())
+
+ if strategy.Name() != ProtocolNameRange {
+ t.Errorf("Expected strategy name '%s', got '%s'", ProtocolNameRange, strategy.Name())
}
-
+
// Test with 2 members, 4 partitions on one topic
members := []*GroupMember{
{
@@ -20,38 +20,38 @@ func TestRangeAssignmentStrategy(t *testing.T) {
Subscription: []string{"topic1"},
},
{
- ID: "member2",
+ ID: "member2",
Subscription: []string{"topic1"},
},
}
-
+
topicPartitions := map[string][]int32{
"topic1": {0, 1, 2, 3},
}
-
+
assignments := strategy.Assign(members, topicPartitions)
-
+
// Verify all members have assignments
if len(assignments) != 2 {
t.Fatalf("Expected assignments for 2 members, got %d", len(assignments))
}
-
+
// Verify total partitions assigned
totalAssigned := 0
for _, assignment := range assignments {
totalAssigned += len(assignment)
}
-
+
if totalAssigned != 4 {
t.Errorf("Expected 4 total partitions assigned, got %d", totalAssigned)
}
-
+
// Range assignment should distribute evenly: 2 partitions each
for memberID, assignment := range assignments {
if len(assignment) != 2 {
t.Errorf("Expected 2 partitions for member %s, got %d", memberID, len(assignment))
}
-
+
// Verify all assignments are for the subscribed topic
for _, pa := range assignment {
if pa.Topic != "topic1" {
@@ -63,27 +63,27 @@ func TestRangeAssignmentStrategy(t *testing.T) {
func TestRangeAssignmentStrategy_UnevenPartitions(t *testing.T) {
strategy := &RangeAssignmentStrategy{}
-
+
// Test with 3 members, 4 partitions - should distribute 2,1,1
members := []*GroupMember{
{ID: "member1", Subscription: []string{"topic1"}},
{ID: "member2", Subscription: []string{"topic1"}},
{ID: "member3", Subscription: []string{"topic1"}},
}
-
+
topicPartitions := map[string][]int32{
"topic1": {0, 1, 2, 3},
}
-
+
assignments := strategy.Assign(members, topicPartitions)
-
+
// Get assignment counts
counts := make([]int, 0, 3)
for _, assignment := range assignments {
counts = append(counts, len(assignment))
}
sort.Ints(counts)
-
+
// Should be distributed as [1, 1, 2] (first member gets extra partition)
expected := []int{1, 1, 2}
if !reflect.DeepEqual(counts, expected) {
@@ -93,30 +93,30 @@ func TestRangeAssignmentStrategy_UnevenPartitions(t *testing.T) {
func TestRangeAssignmentStrategy_MultipleTopics(t *testing.T) {
strategy := &RangeAssignmentStrategy{}
-
+
members := []*GroupMember{
{ID: "member1", Subscription: []string{"topic1", "topic2"}},
{ID: "member2", Subscription: []string{"topic1"}},
}
-
+
topicPartitions := map[string][]int32{
"topic1": {0, 1},
"topic2": {0, 1},
}
-
+
assignments := strategy.Assign(members, topicPartitions)
-
+
// Member1 should get assignments from both topics
member1Assignments := assignments["member1"]
topicsAssigned := make(map[string]int)
for _, pa := range member1Assignments {
topicsAssigned[pa.Topic]++
}
-
+
if len(topicsAssigned) != 2 {
t.Errorf("Expected member1 to be assigned to 2 topics, got %d", len(topicsAssigned))
}
-
+
// Member2 should only get topic1 assignments
member2Assignments := assignments["member2"]
for _, pa := range member2Assignments {
@@ -128,38 +128,38 @@ func TestRangeAssignmentStrategy_MultipleTopics(t *testing.T) {
func TestRoundRobinAssignmentStrategy(t *testing.T) {
strategy := &RoundRobinAssignmentStrategy{}
-
- if strategy.Name() != "roundrobin" {
- t.Errorf("Expected strategy name 'roundrobin', got '%s'", strategy.Name())
+
+ if strategy.Name() != ProtocolNameRoundRobin {
+ t.Errorf("Expected strategy name '%s', got '%s'", ProtocolNameRoundRobin, strategy.Name())
}
-
+
// Test with 2 members, 4 partitions on one topic
members := []*GroupMember{
{ID: "member1", Subscription: []string{"topic1"}},
{ID: "member2", Subscription: []string{"topic1"}},
}
-
+
topicPartitions := map[string][]int32{
"topic1": {0, 1, 2, 3},
}
-
+
assignments := strategy.Assign(members, topicPartitions)
-
+
// Verify all members have assignments
if len(assignments) != 2 {
t.Fatalf("Expected assignments for 2 members, got %d", len(assignments))
}
-
+
// Verify total partitions assigned
totalAssigned := 0
for _, assignment := range assignments {
totalAssigned += len(assignment)
}
-
+
if totalAssigned != 4 {
t.Errorf("Expected 4 total partitions assigned, got %d", totalAssigned)
}
-
+
// Round robin should distribute evenly: 2 partitions each
for memberID, assignment := range assignments {
if len(assignment) != 2 {
@@ -170,26 +170,26 @@ func TestRoundRobinAssignmentStrategy(t *testing.T) {
func TestRoundRobinAssignmentStrategy_MultipleTopics(t *testing.T) {
strategy := &RoundRobinAssignmentStrategy{}
-
+
members := []*GroupMember{
{ID: "member1", Subscription: []string{"topic1", "topic2"}},
{ID: "member2", Subscription: []string{"topic1", "topic2"}},
}
-
+
topicPartitions := map[string][]int32{
"topic1": {0, 1},
"topic2": {0, 1},
}
-
+
assignments := strategy.Assign(members, topicPartitions)
-
+
// Each member should get 2 partitions (round robin across topics)
for memberID, assignment := range assignments {
if len(assignment) != 2 {
t.Errorf("Expected 2 partitions for member %s, got %d", memberID, len(assignment))
}
}
-
+
// Verify no partition is assigned twice
assignedPartitions := make(map[string]map[int32]bool)
for _, assignment := range assignments {
@@ -206,19 +206,19 @@ func TestRoundRobinAssignmentStrategy_MultipleTopics(t *testing.T) {
}
func TestGetAssignmentStrategy(t *testing.T) {
- rangeStrategy := GetAssignmentStrategy("range")
- if rangeStrategy.Name() != "range" {
+ rangeStrategy := GetAssignmentStrategy(ProtocolNameRange)
+ if rangeStrategy.Name() != ProtocolNameRange {
t.Errorf("Expected range strategy, got %s", rangeStrategy.Name())
}
-
- rrStrategy := GetAssignmentStrategy("roundrobin")
- if rrStrategy.Name() != "roundrobin" {
+
+ rrStrategy := GetAssignmentStrategy(ProtocolNameRoundRobin)
+ if rrStrategy.Name() != ProtocolNameRoundRobin {
t.Errorf("Expected roundrobin strategy, got %s", rrStrategy.Name())
}
-
+
// Unknown strategy should default to range
defaultStrategy := GetAssignmentStrategy("unknown")
- if defaultStrategy.Name() != "range" {
+ if defaultStrategy.Name() != ProtocolNameRange {
t.Errorf("Expected default strategy to be range, got %s", defaultStrategy.Name())
}
}
@@ -226,7 +226,7 @@ func TestGetAssignmentStrategy(t *testing.T) {
func TestConsumerGroup_AssignPartitions(t *testing.T) {
group := &ConsumerGroup{
ID: "test-group",
- Protocol: "range",
+ Protocol: ProtocolNameRange,
Members: map[string]*GroupMember{
"member1": {
ID: "member1",
@@ -234,25 +234,25 @@ func TestConsumerGroup_AssignPartitions(t *testing.T) {
State: MemberStateStable,
},
"member2": {
- ID: "member2",
+ ID: "member2",
Subscription: []string{"topic1"},
State: MemberStateStable,
},
},
}
-
+
topicPartitions := map[string][]int32{
"topic1": {0, 1, 2, 3},
}
-
+
group.AssignPartitions(topicPartitions)
-
+
// Verify assignments were created
for memberID, member := range group.Members {
if len(member.Assignment) == 0 {
t.Errorf("Expected member %s to have partition assignments", memberID)
}
-
+
// Verify all assignments are valid
for _, pa := range member.Assignment {
if pa.Topic != "topic1" {
@@ -277,24 +277,24 @@ func TestConsumerGroup_GetMemberAssignments(t *testing.T) {
},
},
}
-
+
assignments := group.GetMemberAssignments()
-
+
if len(assignments) != 1 {
t.Fatalf("Expected 1 member assignment, got %d", len(assignments))
}
-
+
member1Assignments := assignments["member1"]
if len(member1Assignments) != 2 {
t.Errorf("Expected 2 partition assignments for member1, got %d", len(member1Assignments))
}
-
+
// Verify assignment content
expectedAssignments := []PartitionAssignment{
{Topic: "topic1", Partition: 0},
{Topic: "topic1", Partition: 1},
}
-
+
if !reflect.DeepEqual(member1Assignments, expectedAssignments) {
t.Errorf("Expected assignments %v, got %v", expectedAssignments, member1Assignments)
}
@@ -317,21 +317,21 @@ func TestConsumerGroup_UpdateMemberSubscription(t *testing.T) {
"topic2": true,
},
}
-
+
// Update member1's subscription
group.UpdateMemberSubscription("member1", []string{"topic1", "topic3"})
-
+
// Verify member subscription updated
member1 := group.Members["member1"]
expectedSubscription := []string{"topic1", "topic3"}
if !reflect.DeepEqual(member1.Subscription, expectedSubscription) {
t.Errorf("Expected subscription %v, got %v", expectedSubscription, member1.Subscription)
}
-
+
// Verify group subscribed topics updated
expectedGroupTopics := []string{"topic1", "topic2", "topic3"}
actualGroupTopics := group.GetSubscribedTopics()
-
+
if !reflect.DeepEqual(actualGroupTopics, expectedGroupTopics) {
t.Errorf("Expected group topics %v, got %v", expectedGroupTopics, actualGroupTopics)
}
@@ -340,19 +340,19 @@ func TestConsumerGroup_UpdateMemberSubscription(t *testing.T) {
func TestAssignmentStrategy_EmptyMembers(t *testing.T) {
rangeStrategy := &RangeAssignmentStrategy{}
rrStrategy := &RoundRobinAssignmentStrategy{}
-
+
topicPartitions := map[string][]int32{
"topic1": {0, 1, 2, 3},
}
-
+
// Both strategies should handle empty members gracefully
rangeAssignments := rangeStrategy.Assign([]*GroupMember{}, topicPartitions)
rrAssignments := rrStrategy.Assign([]*GroupMember{}, topicPartitions)
-
+
if len(rangeAssignments) != 0 {
t.Error("Expected empty assignments for empty members list (range)")
}
-
+
if len(rrAssignments) != 0 {
t.Error("Expected empty assignments for empty members list (round robin)")
}