diff options
Diffstat (limited to 'weed/mq/kafka/consumer/rebalance_timeout_test.go')
| -rw-r--r-- | weed/mq/kafka/consumer/rebalance_timeout_test.go | 331 |
1 files changed, 331 insertions, 0 deletions
diff --git a/weed/mq/kafka/consumer/rebalance_timeout_test.go b/weed/mq/kafka/consumer/rebalance_timeout_test.go new file mode 100644 index 000000000..ac5f90aee --- /dev/null +++ b/weed/mq/kafka/consumer/rebalance_timeout_test.go @@ -0,0 +1,331 @@ +package consumer + +import ( + "testing" + "time" +) + +func TestRebalanceTimeoutManager_CheckRebalanceTimeouts(t *testing.T) { + coordinator := NewGroupCoordinator() + defer coordinator.Close() + + rtm := coordinator.rebalanceTimeoutManager + + // Create a group with a member that has a short rebalance timeout + group := coordinator.GetOrCreateGroup("test-group") + group.Mu.Lock() + group.State = GroupStatePreparingRebalance + + member := &GroupMember{ + ID: "member1", + ClientID: "client1", + SessionTimeout: 30000, // 30 seconds + RebalanceTimeout: 1000, // 1 second (very short for testing) + State: MemberStatePending, + LastHeartbeat: time.Now(), + JoinedAt: time.Now().Add(-2 * time.Second), // Joined 2 seconds ago + } + group.Members["member1"] = member + group.Mu.Unlock() + + // Check timeouts - member should be evicted + rtm.CheckRebalanceTimeouts() + + group.Mu.RLock() + if len(group.Members) != 0 { + t.Errorf("Expected member to be evicted due to rebalance timeout, but %d members remain", len(group.Members)) + } + + if group.State != GroupStateEmpty { + t.Errorf("Expected group state to be Empty after member eviction, got %s", group.State.String()) + } + group.Mu.RUnlock() +} + +func TestRebalanceTimeoutManager_SessionTimeoutFallback(t *testing.T) { + coordinator := NewGroupCoordinator() + defer coordinator.Close() + + rtm := coordinator.rebalanceTimeoutManager + + // Create a group with a member that has exceeded session timeout + group := coordinator.GetOrCreateGroup("test-group") + group.Mu.Lock() + group.State = GroupStatePreparingRebalance + + member := &GroupMember{ + ID: "member1", + ClientID: "client1", + SessionTimeout: 1000, // 1 second + RebalanceTimeout: 30000, // 30 seconds + State: MemberStatePending, + LastHeartbeat: time.Now().Add(-2 * time.Second), // Last heartbeat 2 seconds ago + JoinedAt: time.Now(), + } + group.Members["member1"] = member + group.Mu.Unlock() + + // Check timeouts - member should be evicted due to session timeout + rtm.CheckRebalanceTimeouts() + + group.Mu.RLock() + if len(group.Members) != 0 { + t.Errorf("Expected member to be evicted due to session timeout, but %d members remain", len(group.Members)) + } + group.Mu.RUnlock() +} + +func TestRebalanceTimeoutManager_LeaderEviction(t *testing.T) { + coordinator := NewGroupCoordinator() + defer coordinator.Close() + + rtm := coordinator.rebalanceTimeoutManager + + // Create a group with leader and another member + group := coordinator.GetOrCreateGroup("test-group") + group.Mu.Lock() + group.State = GroupStatePreparingRebalance + group.Leader = "member1" + + // Leader with expired rebalance timeout + leader := &GroupMember{ + ID: "member1", + ClientID: "client1", + SessionTimeout: 30000, + RebalanceTimeout: 1000, + State: MemberStatePending, + LastHeartbeat: time.Now(), + JoinedAt: time.Now().Add(-2 * time.Second), + } + group.Members["member1"] = leader + + // Another member that's still valid + member2 := &GroupMember{ + ID: "member2", + ClientID: "client2", + SessionTimeout: 30000, + RebalanceTimeout: 30000, + State: MemberStatePending, + LastHeartbeat: time.Now(), + JoinedAt: time.Now(), + } + group.Members["member2"] = member2 + group.Mu.Unlock() + + // Check timeouts - leader should be evicted, new leader selected + rtm.CheckRebalanceTimeouts() + + group.Mu.RLock() + if len(group.Members) != 1 { + t.Errorf("Expected 1 member to remain after leader eviction, got %d", len(group.Members)) + } + + if group.Leader != "member2" { + t.Errorf("Expected member2 to become new leader, got %s", group.Leader) + } + + if group.State != GroupStatePreparingRebalance { + t.Errorf("Expected group to restart rebalancing after leader eviction, got %s", group.State.String()) + } + group.Mu.RUnlock() +} + +func TestRebalanceTimeoutManager_IsRebalanceStuck(t *testing.T) { + coordinator := NewGroupCoordinator() + defer coordinator.Close() + + rtm := coordinator.rebalanceTimeoutManager + + // Create a group that's been rebalancing for a while + group := coordinator.GetOrCreateGroup("test-group") + group.Mu.Lock() + group.State = GroupStatePreparingRebalance + group.LastActivity = time.Now().Add(-15 * time.Minute) // 15 minutes ago + group.Mu.Unlock() + + // Check if rebalance is stuck (max 10 minutes) + maxDuration := 10 * time.Minute + if !rtm.IsRebalanceStuck(group, maxDuration) { + t.Error("Expected rebalance to be detected as stuck") + } + + // Test with a group that's not stuck + group.Mu.Lock() + group.LastActivity = time.Now().Add(-5 * time.Minute) // 5 minutes ago + group.Mu.Unlock() + + if rtm.IsRebalanceStuck(group, maxDuration) { + t.Error("Expected rebalance to not be detected as stuck") + } + + // Test with stable group (should not be stuck) + group.Mu.Lock() + group.State = GroupStateStable + group.LastActivity = time.Now().Add(-15 * time.Minute) + group.Mu.Unlock() + + if rtm.IsRebalanceStuck(group, maxDuration) { + t.Error("Stable group should not be detected as stuck") + } +} + +func TestRebalanceTimeoutManager_ForceCompleteRebalance(t *testing.T) { + coordinator := NewGroupCoordinator() + defer coordinator.Close() + + rtm := coordinator.rebalanceTimeoutManager + + // Test forcing completion from PreparingRebalance + group := coordinator.GetOrCreateGroup("test-group") + group.Mu.Lock() + group.State = GroupStatePreparingRebalance + + member := &GroupMember{ + ID: "member1", + State: MemberStatePending, + } + group.Members["member1"] = member + group.Mu.Unlock() + + rtm.ForceCompleteRebalance(group) + + group.Mu.RLock() + if group.State != GroupStateCompletingRebalance { + t.Errorf("Expected group state to be CompletingRebalance, got %s", group.State.String()) + } + group.Mu.RUnlock() + + // Test forcing completion from CompletingRebalance + rtm.ForceCompleteRebalance(group) + + group.Mu.RLock() + if group.State != GroupStateStable { + t.Errorf("Expected group state to be Stable, got %s", group.State.String()) + } + + if member.State != MemberStateStable { + t.Errorf("Expected member state to be Stable, got %s", member.State.String()) + } + group.Mu.RUnlock() +} + +func TestRebalanceTimeoutManager_GetRebalanceStatus(t *testing.T) { + coordinator := NewGroupCoordinator() + defer coordinator.Close() + + rtm := coordinator.rebalanceTimeoutManager + + // Test with non-existent group + status := rtm.GetRebalanceStatus("non-existent") + if status != nil { + t.Error("Expected nil status for non-existent group") + } + + // Create a group with members + group := coordinator.GetOrCreateGroup("test-group") + group.Mu.Lock() + group.State = GroupStatePreparingRebalance + group.Generation = 5 + group.Leader = "member1" + group.LastActivity = time.Now().Add(-2 * time.Minute) + + member1 := &GroupMember{ + ID: "member1", + State: MemberStatePending, + LastHeartbeat: time.Now().Add(-30 * time.Second), + JoinedAt: time.Now().Add(-2 * time.Minute), + SessionTimeout: 30000, // 30 seconds + RebalanceTimeout: 300000, // 5 minutes + } + group.Members["member1"] = member1 + + member2 := &GroupMember{ + ID: "member2", + State: MemberStatePending, + LastHeartbeat: time.Now().Add(-10 * time.Second), + JoinedAt: time.Now().Add(-1 * time.Minute), + SessionTimeout: 60000, // 1 minute + RebalanceTimeout: 180000, // 3 minutes + } + group.Members["member2"] = member2 + group.Mu.Unlock() + + // Get status + status = rtm.GetRebalanceStatus("test-group") + + if status == nil { + t.Fatal("Expected non-nil status") + } + + if status.GroupID != "test-group" { + t.Errorf("Expected group ID 'test-group', got %s", status.GroupID) + } + + if status.State != GroupStatePreparingRebalance { + t.Errorf("Expected state PreparingRebalance, got %s", status.State.String()) + } + + if status.Generation != 5 { + t.Errorf("Expected generation 5, got %d", status.Generation) + } + + if status.MemberCount != 2 { + t.Errorf("Expected 2 members, got %d", status.MemberCount) + } + + if status.Leader != "member1" { + t.Errorf("Expected leader 'member1', got %s", status.Leader) + } + + if !status.IsRebalancing { + t.Error("Expected IsRebalancing to be true") + } + + if len(status.Members) != 2 { + t.Errorf("Expected 2 member statuses, got %d", len(status.Members)) + } + + // Check member timeout calculations + for _, memberStatus := range status.Members { + if memberStatus.SessionTimeRemaining < 0 { + t.Errorf("Session time remaining should not be negative for member %s", memberStatus.MemberID) + } + + if memberStatus.RebalanceTimeRemaining < 0 { + t.Errorf("Rebalance time remaining should not be negative for member %s", memberStatus.MemberID) + } + } +} + +func TestRebalanceTimeoutManager_DefaultRebalanceTimeout(t *testing.T) { + coordinator := NewGroupCoordinator() + defer coordinator.Close() + + rtm := coordinator.rebalanceTimeoutManager + + // Create a group with a member that has no rebalance timeout set (0) + group := coordinator.GetOrCreateGroup("test-group") + group.Mu.Lock() + group.State = GroupStatePreparingRebalance + + member := &GroupMember{ + ID: "member1", + ClientID: "client1", + SessionTimeout: 30000, // 30 seconds + RebalanceTimeout: 0, // Not set, should use default + State: MemberStatePending, + LastHeartbeat: time.Now(), + JoinedAt: time.Now().Add(-6 * time.Minute), // Joined 6 minutes ago + } + group.Members["member1"] = member + group.Mu.Unlock() + + // Default rebalance timeout is 5 minutes (300000ms), so member should be evicted + rtm.CheckRebalanceTimeouts() + + group.Mu.RLock() + if len(group.Members) != 0 { + t.Errorf("Expected member to be evicted using default rebalance timeout, but %d members remain", len(group.Members)) + } + group.Mu.RUnlock() +} |
