aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/consumer/rebalance_timeout_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/consumer/rebalance_timeout_test.go')
-rw-r--r--weed/mq/kafka/consumer/rebalance_timeout_test.go331
1 files changed, 331 insertions, 0 deletions
diff --git a/weed/mq/kafka/consumer/rebalance_timeout_test.go b/weed/mq/kafka/consumer/rebalance_timeout_test.go
new file mode 100644
index 000000000..ac5f90aee
--- /dev/null
+++ b/weed/mq/kafka/consumer/rebalance_timeout_test.go
@@ -0,0 +1,331 @@
+package consumer
+
+import (
+ "testing"
+ "time"
+)
+
+func TestRebalanceTimeoutManager_CheckRebalanceTimeouts(t *testing.T) {
+ coordinator := NewGroupCoordinator()
+ defer coordinator.Close()
+
+ rtm := coordinator.rebalanceTimeoutManager
+
+ // Create a group with a member that has a short rebalance timeout
+ group := coordinator.GetOrCreateGroup("test-group")
+ group.Mu.Lock()
+ group.State = GroupStatePreparingRebalance
+
+ member := &GroupMember{
+ ID: "member1",
+ ClientID: "client1",
+ SessionTimeout: 30000, // 30 seconds
+ RebalanceTimeout: 1000, // 1 second (very short for testing)
+ State: MemberStatePending,
+ LastHeartbeat: time.Now(),
+ JoinedAt: time.Now().Add(-2 * time.Second), // Joined 2 seconds ago
+ }
+ group.Members["member1"] = member
+ group.Mu.Unlock()
+
+ // Check timeouts - member should be evicted
+ rtm.CheckRebalanceTimeouts()
+
+ group.Mu.RLock()
+ if len(group.Members) != 0 {
+ t.Errorf("Expected member to be evicted due to rebalance timeout, but %d members remain", len(group.Members))
+ }
+
+ if group.State != GroupStateEmpty {
+ t.Errorf("Expected group state to be Empty after member eviction, got %s", group.State.String())
+ }
+ group.Mu.RUnlock()
+}
+
+func TestRebalanceTimeoutManager_SessionTimeoutFallback(t *testing.T) {
+ coordinator := NewGroupCoordinator()
+ defer coordinator.Close()
+
+ rtm := coordinator.rebalanceTimeoutManager
+
+ // Create a group with a member that has exceeded session timeout
+ group := coordinator.GetOrCreateGroup("test-group")
+ group.Mu.Lock()
+ group.State = GroupStatePreparingRebalance
+
+ member := &GroupMember{
+ ID: "member1",
+ ClientID: "client1",
+ SessionTimeout: 1000, // 1 second
+ RebalanceTimeout: 30000, // 30 seconds
+ State: MemberStatePending,
+ LastHeartbeat: time.Now().Add(-2 * time.Second), // Last heartbeat 2 seconds ago
+ JoinedAt: time.Now(),
+ }
+ group.Members["member1"] = member
+ group.Mu.Unlock()
+
+ // Check timeouts - member should be evicted due to session timeout
+ rtm.CheckRebalanceTimeouts()
+
+ group.Mu.RLock()
+ if len(group.Members) != 0 {
+ t.Errorf("Expected member to be evicted due to session timeout, but %d members remain", len(group.Members))
+ }
+ group.Mu.RUnlock()
+}
+
+func TestRebalanceTimeoutManager_LeaderEviction(t *testing.T) {
+ coordinator := NewGroupCoordinator()
+ defer coordinator.Close()
+
+ rtm := coordinator.rebalanceTimeoutManager
+
+ // Create a group with leader and another member
+ group := coordinator.GetOrCreateGroup("test-group")
+ group.Mu.Lock()
+ group.State = GroupStatePreparingRebalance
+ group.Leader = "member1"
+
+ // Leader with expired rebalance timeout
+ leader := &GroupMember{
+ ID: "member1",
+ ClientID: "client1",
+ SessionTimeout: 30000,
+ RebalanceTimeout: 1000,
+ State: MemberStatePending,
+ LastHeartbeat: time.Now(),
+ JoinedAt: time.Now().Add(-2 * time.Second),
+ }
+ group.Members["member1"] = leader
+
+ // Another member that's still valid
+ member2 := &GroupMember{
+ ID: "member2",
+ ClientID: "client2",
+ SessionTimeout: 30000,
+ RebalanceTimeout: 30000,
+ State: MemberStatePending,
+ LastHeartbeat: time.Now(),
+ JoinedAt: time.Now(),
+ }
+ group.Members["member2"] = member2
+ group.Mu.Unlock()
+
+ // Check timeouts - leader should be evicted, new leader selected
+ rtm.CheckRebalanceTimeouts()
+
+ group.Mu.RLock()
+ if len(group.Members) != 1 {
+ t.Errorf("Expected 1 member to remain after leader eviction, got %d", len(group.Members))
+ }
+
+ if group.Leader != "member2" {
+ t.Errorf("Expected member2 to become new leader, got %s", group.Leader)
+ }
+
+ if group.State != GroupStatePreparingRebalance {
+ t.Errorf("Expected group to restart rebalancing after leader eviction, got %s", group.State.String())
+ }
+ group.Mu.RUnlock()
+}
+
+func TestRebalanceTimeoutManager_IsRebalanceStuck(t *testing.T) {
+ coordinator := NewGroupCoordinator()
+ defer coordinator.Close()
+
+ rtm := coordinator.rebalanceTimeoutManager
+
+ // Create a group that's been rebalancing for a while
+ group := coordinator.GetOrCreateGroup("test-group")
+ group.Mu.Lock()
+ group.State = GroupStatePreparingRebalance
+ group.LastActivity = time.Now().Add(-15 * time.Minute) // 15 minutes ago
+ group.Mu.Unlock()
+
+ // Check if rebalance is stuck (max 10 minutes)
+ maxDuration := 10 * time.Minute
+ if !rtm.IsRebalanceStuck(group, maxDuration) {
+ t.Error("Expected rebalance to be detected as stuck")
+ }
+
+ // Test with a group that's not stuck
+ group.Mu.Lock()
+ group.LastActivity = time.Now().Add(-5 * time.Minute) // 5 minutes ago
+ group.Mu.Unlock()
+
+ if rtm.IsRebalanceStuck(group, maxDuration) {
+ t.Error("Expected rebalance to not be detected as stuck")
+ }
+
+ // Test with stable group (should not be stuck)
+ group.Mu.Lock()
+ group.State = GroupStateStable
+ group.LastActivity = time.Now().Add(-15 * time.Minute)
+ group.Mu.Unlock()
+
+ if rtm.IsRebalanceStuck(group, maxDuration) {
+ t.Error("Stable group should not be detected as stuck")
+ }
+}
+
+func TestRebalanceTimeoutManager_ForceCompleteRebalance(t *testing.T) {
+ coordinator := NewGroupCoordinator()
+ defer coordinator.Close()
+
+ rtm := coordinator.rebalanceTimeoutManager
+
+ // Test forcing completion from PreparingRebalance
+ group := coordinator.GetOrCreateGroup("test-group")
+ group.Mu.Lock()
+ group.State = GroupStatePreparingRebalance
+
+ member := &GroupMember{
+ ID: "member1",
+ State: MemberStatePending,
+ }
+ group.Members["member1"] = member
+ group.Mu.Unlock()
+
+ rtm.ForceCompleteRebalance(group)
+
+ group.Mu.RLock()
+ if group.State != GroupStateCompletingRebalance {
+ t.Errorf("Expected group state to be CompletingRebalance, got %s", group.State.String())
+ }
+ group.Mu.RUnlock()
+
+ // Test forcing completion from CompletingRebalance
+ rtm.ForceCompleteRebalance(group)
+
+ group.Mu.RLock()
+ if group.State != GroupStateStable {
+ t.Errorf("Expected group state to be Stable, got %s", group.State.String())
+ }
+
+ if member.State != MemberStateStable {
+ t.Errorf("Expected member state to be Stable, got %s", member.State.String())
+ }
+ group.Mu.RUnlock()
+}
+
+func TestRebalanceTimeoutManager_GetRebalanceStatus(t *testing.T) {
+ coordinator := NewGroupCoordinator()
+ defer coordinator.Close()
+
+ rtm := coordinator.rebalanceTimeoutManager
+
+ // Test with non-existent group
+ status := rtm.GetRebalanceStatus("non-existent")
+ if status != nil {
+ t.Error("Expected nil status for non-existent group")
+ }
+
+ // Create a group with members
+ group := coordinator.GetOrCreateGroup("test-group")
+ group.Mu.Lock()
+ group.State = GroupStatePreparingRebalance
+ group.Generation = 5
+ group.Leader = "member1"
+ group.LastActivity = time.Now().Add(-2 * time.Minute)
+
+ member1 := &GroupMember{
+ ID: "member1",
+ State: MemberStatePending,
+ LastHeartbeat: time.Now().Add(-30 * time.Second),
+ JoinedAt: time.Now().Add(-2 * time.Minute),
+ SessionTimeout: 30000, // 30 seconds
+ RebalanceTimeout: 300000, // 5 minutes
+ }
+ group.Members["member1"] = member1
+
+ member2 := &GroupMember{
+ ID: "member2",
+ State: MemberStatePending,
+ LastHeartbeat: time.Now().Add(-10 * time.Second),
+ JoinedAt: time.Now().Add(-1 * time.Minute),
+ SessionTimeout: 60000, // 1 minute
+ RebalanceTimeout: 180000, // 3 minutes
+ }
+ group.Members["member2"] = member2
+ group.Mu.Unlock()
+
+ // Get status
+ status = rtm.GetRebalanceStatus("test-group")
+
+ if status == nil {
+ t.Fatal("Expected non-nil status")
+ }
+
+ if status.GroupID != "test-group" {
+ t.Errorf("Expected group ID 'test-group', got %s", status.GroupID)
+ }
+
+ if status.State != GroupStatePreparingRebalance {
+ t.Errorf("Expected state PreparingRebalance, got %s", status.State.String())
+ }
+
+ if status.Generation != 5 {
+ t.Errorf("Expected generation 5, got %d", status.Generation)
+ }
+
+ if status.MemberCount != 2 {
+ t.Errorf("Expected 2 members, got %d", status.MemberCount)
+ }
+
+ if status.Leader != "member1" {
+ t.Errorf("Expected leader 'member1', got %s", status.Leader)
+ }
+
+ if !status.IsRebalancing {
+ t.Error("Expected IsRebalancing to be true")
+ }
+
+ if len(status.Members) != 2 {
+ t.Errorf("Expected 2 member statuses, got %d", len(status.Members))
+ }
+
+ // Check member timeout calculations
+ for _, memberStatus := range status.Members {
+ if memberStatus.SessionTimeRemaining < 0 {
+ t.Errorf("Session time remaining should not be negative for member %s", memberStatus.MemberID)
+ }
+
+ if memberStatus.RebalanceTimeRemaining < 0 {
+ t.Errorf("Rebalance time remaining should not be negative for member %s", memberStatus.MemberID)
+ }
+ }
+}
+
+func TestRebalanceTimeoutManager_DefaultRebalanceTimeout(t *testing.T) {
+ coordinator := NewGroupCoordinator()
+ defer coordinator.Close()
+
+ rtm := coordinator.rebalanceTimeoutManager
+
+ // Create a group with a member that has no rebalance timeout set (0)
+ group := coordinator.GetOrCreateGroup("test-group")
+ group.Mu.Lock()
+ group.State = GroupStatePreparingRebalance
+
+ member := &GroupMember{
+ ID: "member1",
+ ClientID: "client1",
+ SessionTimeout: 30000, // 30 seconds
+ RebalanceTimeout: 0, // Not set, should use default
+ State: MemberStatePending,
+ LastHeartbeat: time.Now(),
+ JoinedAt: time.Now().Add(-6 * time.Minute), // Joined 6 minutes ago
+ }
+ group.Members["member1"] = member
+ group.Mu.Unlock()
+
+ // Default rebalance timeout is 5 minutes (300000ms), so member should be evicted
+ rtm.CheckRebalanceTimeouts()
+
+ group.Mu.RLock()
+ if len(group.Members) != 0 {
+ t.Errorf("Expected member to be evicted using default rebalance timeout, but %d members remain", len(group.Members))
+ }
+ group.Mu.RUnlock()
+}