aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/consumer/rebalance_timeout.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/consumer/rebalance_timeout.go')
-rw-r--r--weed/mq/kafka/consumer/rebalance_timeout.go218
1 files changed, 218 insertions, 0 deletions
diff --git a/weed/mq/kafka/consumer/rebalance_timeout.go b/weed/mq/kafka/consumer/rebalance_timeout.go
new file mode 100644
index 000000000..9844723c0
--- /dev/null
+++ b/weed/mq/kafka/consumer/rebalance_timeout.go
@@ -0,0 +1,218 @@
+package consumer
+
+import (
+ "time"
+)
+
+// RebalanceTimeoutManager handles rebalance timeout logic and member eviction
+type RebalanceTimeoutManager struct {
+ coordinator *GroupCoordinator
+}
+
+// NewRebalanceTimeoutManager creates a new rebalance timeout manager
+func NewRebalanceTimeoutManager(coordinator *GroupCoordinator) *RebalanceTimeoutManager {
+ return &RebalanceTimeoutManager{
+ coordinator: coordinator,
+ }
+}
+
+// CheckRebalanceTimeouts checks for members that have exceeded rebalance timeouts
+func (rtm *RebalanceTimeoutManager) CheckRebalanceTimeouts() {
+ now := time.Now()
+ rtm.coordinator.groupsMu.RLock()
+ defer rtm.coordinator.groupsMu.RUnlock()
+
+ for _, group := range rtm.coordinator.groups {
+ group.Mu.Lock()
+
+ // Only check timeouts for groups in rebalancing states
+ if group.State == GroupStatePreparingRebalance || group.State == GroupStateCompletingRebalance {
+ rtm.checkGroupRebalanceTimeout(group, now)
+ }
+
+ group.Mu.Unlock()
+ }
+}
+
+// checkGroupRebalanceTimeout checks and handles rebalance timeout for a specific group
+func (rtm *RebalanceTimeoutManager) checkGroupRebalanceTimeout(group *ConsumerGroup, now time.Time) {
+ expiredMembers := make([]string, 0)
+
+ for memberID, member := range group.Members {
+ // Check if member has exceeded its rebalance timeout
+ rebalanceTimeout := time.Duration(member.RebalanceTimeout) * time.Millisecond
+ if rebalanceTimeout == 0 {
+ // Use default rebalance timeout if not specified
+ rebalanceTimeout = time.Duration(rtm.coordinator.rebalanceTimeoutMs) * time.Millisecond
+ }
+
+ // For members in pending state during rebalance, check against join time
+ if member.State == MemberStatePending {
+ if now.Sub(member.JoinedAt) > rebalanceTimeout {
+ expiredMembers = append(expiredMembers, memberID)
+ }
+ }
+
+ // Also check session timeout as a fallback
+ sessionTimeout := time.Duration(member.SessionTimeout) * time.Millisecond
+ if now.Sub(member.LastHeartbeat) > sessionTimeout {
+ expiredMembers = append(expiredMembers, memberID)
+ }
+ }
+
+ // Remove expired members and trigger rebalance if necessary
+ if len(expiredMembers) > 0 {
+ rtm.evictExpiredMembers(group, expiredMembers)
+ }
+}
+
+// evictExpiredMembers removes expired members and updates group state
+func (rtm *RebalanceTimeoutManager) evictExpiredMembers(group *ConsumerGroup, expiredMembers []string) {
+ for _, memberID := range expiredMembers {
+ delete(group.Members, memberID)
+
+ // If the leader was evicted, clear leader
+ if group.Leader == memberID {
+ group.Leader = ""
+ }
+ }
+
+ // Update group state based on remaining members
+ if len(group.Members) == 0 {
+ group.State = GroupStateEmpty
+ group.Generation++
+ group.Leader = ""
+ } else {
+ // If we were in the middle of rebalancing, restart the process
+ if group.State == GroupStatePreparingRebalance || group.State == GroupStateCompletingRebalance {
+ // Select new leader if needed
+ if group.Leader == "" {
+ for memberID := range group.Members {
+ group.Leader = memberID
+ break
+ }
+ }
+
+ // Reset to preparing rebalance to restart the process
+ group.State = GroupStatePreparingRebalance
+ group.Generation++
+
+ // Mark remaining members as pending
+ for _, member := range group.Members {
+ member.State = MemberStatePending
+ }
+ }
+ }
+
+ group.LastActivity = time.Now()
+}
+
+// IsRebalanceStuck checks if a group has been stuck in rebalancing for too long
+func (rtm *RebalanceTimeoutManager) IsRebalanceStuck(group *ConsumerGroup, maxRebalanceDuration time.Duration) bool {
+ if group.State != GroupStatePreparingRebalance && group.State != GroupStateCompletingRebalance {
+ return false
+ }
+
+ return time.Since(group.LastActivity) > maxRebalanceDuration
+}
+
+// ForceCompleteRebalance forces completion of a stuck rebalance
+func (rtm *RebalanceTimeoutManager) ForceCompleteRebalance(group *ConsumerGroup) {
+ group.Mu.Lock()
+ defer group.Mu.Unlock()
+
+ // If stuck in preparing rebalance, move to completing
+ if group.State == GroupStatePreparingRebalance {
+ group.State = GroupStateCompletingRebalance
+ group.LastActivity = time.Now()
+ return
+ }
+
+ // If stuck in completing rebalance, force to stable
+ if group.State == GroupStateCompletingRebalance {
+ group.State = GroupStateStable
+ for _, member := range group.Members {
+ member.State = MemberStateStable
+ }
+ group.LastActivity = time.Now()
+ return
+ }
+}
+
+// GetRebalanceStatus returns the current rebalance status for a group
+func (rtm *RebalanceTimeoutManager) GetRebalanceStatus(groupID string) *RebalanceStatus {
+ group := rtm.coordinator.GetGroup(groupID)
+ if group == nil {
+ return nil
+ }
+
+ group.Mu.RLock()
+ defer group.Mu.RUnlock()
+
+ status := &RebalanceStatus{
+ GroupID: groupID,
+ State: group.State,
+ Generation: group.Generation,
+ MemberCount: len(group.Members),
+ Leader: group.Leader,
+ LastActivity: group.LastActivity,
+ IsRebalancing: group.State == GroupStatePreparingRebalance || group.State == GroupStateCompletingRebalance,
+ RebalanceDuration: time.Since(group.LastActivity),
+ }
+
+ // Calculate member timeout status
+ now := time.Now()
+ for memberID, member := range group.Members {
+ memberStatus := MemberTimeoutStatus{
+ MemberID: memberID,
+ State: member.State,
+ LastHeartbeat: member.LastHeartbeat,
+ JoinedAt: member.JoinedAt,
+ SessionTimeout: time.Duration(member.SessionTimeout) * time.Millisecond,
+ RebalanceTimeout: time.Duration(member.RebalanceTimeout) * time.Millisecond,
+ }
+
+ // Calculate time until session timeout
+ sessionTimeRemaining := memberStatus.SessionTimeout - now.Sub(member.LastHeartbeat)
+ if sessionTimeRemaining < 0 {
+ sessionTimeRemaining = 0
+ }
+ memberStatus.SessionTimeRemaining = sessionTimeRemaining
+
+ // Calculate time until rebalance timeout
+ rebalanceTimeRemaining := memberStatus.RebalanceTimeout - now.Sub(member.JoinedAt)
+ if rebalanceTimeRemaining < 0 {
+ rebalanceTimeRemaining = 0
+ }
+ memberStatus.RebalanceTimeRemaining = rebalanceTimeRemaining
+
+ status.Members = append(status.Members, memberStatus)
+ }
+
+ return status
+}
+
+// RebalanceStatus represents the current status of a group's rebalance
+type RebalanceStatus struct {
+ GroupID string `json:"group_id"`
+ State GroupState `json:"state"`
+ Generation int32 `json:"generation"`
+ MemberCount int `json:"member_count"`
+ Leader string `json:"leader"`
+ LastActivity time.Time `json:"last_activity"`
+ IsRebalancing bool `json:"is_rebalancing"`
+ RebalanceDuration time.Duration `json:"rebalance_duration"`
+ Members []MemberTimeoutStatus `json:"members"`
+}
+
+// MemberTimeoutStatus represents timeout status for a group member
+type MemberTimeoutStatus struct {
+ MemberID string `json:"member_id"`
+ State MemberState `json:"state"`
+ LastHeartbeat time.Time `json:"last_heartbeat"`
+ JoinedAt time.Time `json:"joined_at"`
+ SessionTimeout time.Duration `json:"session_timeout"`
+ RebalanceTimeout time.Duration `json:"rebalance_timeout"`
+ SessionTimeRemaining time.Duration `json:"session_time_remaining"`
+ RebalanceTimeRemaining time.Duration `json:"rebalance_time_remaining"`
+}