aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/consumer/group_coordinator.go
blob: 1158f94310d3c72eadef4653382574f2cf002acc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
package consumer

import (
	"crypto/sha256"
	"fmt"
	"sync"
	"time"
)

// GroupState represents the state of a consumer group
type GroupState int

const (
	GroupStateEmpty GroupState = iota
	GroupStatePreparingRebalance
	GroupStateCompletingRebalance
	GroupStateStable
	GroupStateDead
)

func (gs GroupState) String() string {
	switch gs {
	case GroupStateEmpty:
		return "Empty"
	case GroupStatePreparingRebalance:
		return "PreparingRebalance"
	case GroupStateCompletingRebalance:
		return "CompletingRebalance"
	case GroupStateStable:
		return "Stable"
	case GroupStateDead:
		return "Dead"
	default:
		return "Unknown"
	}
}

// MemberState represents the state of a group member
type MemberState int

const (
	MemberStateUnknown MemberState = iota
	MemberStatePending
	MemberStateStable
	MemberStateLeaving
)

func (ms MemberState) String() string {
	switch ms {
	case MemberStateUnknown:
		return "Unknown"
	case MemberStatePending:
		return "Pending"
	case MemberStateStable:
		return "Stable"
	case MemberStateLeaving:
		return "Leaving"
	default:
		return "Unknown"
	}
}

// GroupMember represents a consumer in a consumer group
type GroupMember struct {
	ID               string                // Member ID (generated by gateway)
	ClientID         string                // Client ID from consumer
	ClientHost       string                // Client host/IP
	GroupInstanceID  *string               // Static membership instance ID (optional)
	SessionTimeout   int32                 // Session timeout in milliseconds
	RebalanceTimeout int32                 // Rebalance timeout in milliseconds
	Subscription     []string              // Subscribed topics
	Assignment       []PartitionAssignment // Assigned partitions
	Metadata         []byte                // Protocol-specific metadata
	State            MemberState           // Current member state
	LastHeartbeat    time.Time             // Last heartbeat timestamp
	JoinedAt         time.Time             // When member joined group
}

// PartitionAssignment represents partition assignment for a member
type PartitionAssignment struct {
	Topic     string
	Partition int32
}

// ConsumerGroup represents a Kafka consumer group
type ConsumerGroup struct {
	ID               string                            // Group ID
	State            GroupState                        // Current group state
	Generation       int32                             // Generation ID (incremented on rebalance)
	Protocol         string                            // Assignment protocol (e.g., "range", "roundrobin")
	Leader           string                            // Leader member ID
	Members          map[string]*GroupMember           // Group members by member ID
	StaticMembers    map[string]string                 // Static instance ID -> member ID mapping
	SubscribedTopics map[string]bool                   // Topics subscribed by group
	OffsetCommits    map[string]map[int32]OffsetCommit // Topic -> Partition -> Offset
	CreatedAt        time.Time                         // Group creation time
	LastActivity     time.Time                         // Last activity (join, heartbeat, etc.)

	Mu sync.RWMutex // Protects group state
}

// OffsetCommit represents a committed offset for a topic partition
type OffsetCommit struct {
	Offset    int64     // Committed offset
	Metadata  string    // Optional metadata
	Timestamp time.Time // Commit timestamp
}

// GroupCoordinator manages consumer groups
type GroupCoordinator struct {
	groups   map[string]*ConsumerGroup // Group ID -> Group
	groupsMu sync.RWMutex              // Protects groups map

	// Configuration
	sessionTimeoutMin  int32 // Minimum session timeout (ms)
	sessionTimeoutMax  int32 // Maximum session timeout (ms)
	rebalanceTimeoutMs int32 // Default rebalance timeout (ms)

	// Timeout management
	rebalanceTimeoutManager *RebalanceTimeoutManager

	// Cleanup
	cleanupTicker *time.Ticker
	stopChan      chan struct{}
	stopOnce      sync.Once
}

// NewGroupCoordinator creates a new consumer group coordinator
func NewGroupCoordinator() *GroupCoordinator {
	gc := &GroupCoordinator{
		groups:             make(map[string]*ConsumerGroup),
		sessionTimeoutMin:  6000,   // 6 seconds
		sessionTimeoutMax:  300000, // 5 minutes
		rebalanceTimeoutMs: 300000, // 5 minutes
		stopChan:           make(chan struct{}),
	}

	// Initialize rebalance timeout manager
	gc.rebalanceTimeoutManager = NewRebalanceTimeoutManager(gc)

	// Start cleanup routine
	gc.cleanupTicker = time.NewTicker(30 * time.Second)
	go gc.cleanupRoutine()

	return gc
}

// GetOrCreateGroup returns an existing group or creates a new one
func (gc *GroupCoordinator) GetOrCreateGroup(groupID string) *ConsumerGroup {
	gc.groupsMu.Lock()
	defer gc.groupsMu.Unlock()

	group, exists := gc.groups[groupID]
	if !exists {
		group = &ConsumerGroup{
			ID:               groupID,
			State:            GroupStateEmpty,
			Generation:       0,
			Members:          make(map[string]*GroupMember),
			StaticMembers:    make(map[string]string),
			SubscribedTopics: make(map[string]bool),
			OffsetCommits:    make(map[string]map[int32]OffsetCommit),
			CreatedAt:        time.Now(),
			LastActivity:     time.Now(),
		}
		gc.groups[groupID] = group
	}

	return group
}

// GetGroup returns an existing group or nil if not found
func (gc *GroupCoordinator) GetGroup(groupID string) *ConsumerGroup {
	gc.groupsMu.RLock()
	defer gc.groupsMu.RUnlock()

	return gc.groups[groupID]
}

// RemoveGroup removes a group from the coordinator
func (gc *GroupCoordinator) RemoveGroup(groupID string) {
	gc.groupsMu.Lock()
	defer gc.groupsMu.Unlock()

	delete(gc.groups, groupID)
}

// ListGroups returns all current group IDs
func (gc *GroupCoordinator) ListGroups() []string {
	gc.groupsMu.RLock()
	defer gc.groupsMu.RUnlock()

	groups := make([]string, 0, len(gc.groups))
	for groupID := range gc.groups {
		groups = append(groups, groupID)
	}
	return groups
}

// FindStaticMember finds a member by static instance ID
func (gc *GroupCoordinator) FindStaticMember(group *ConsumerGroup, instanceID string) *GroupMember {
	if instanceID == "" {
		return nil
	}

	group.Mu.RLock()
	defer group.Mu.RUnlock()

	if memberID, exists := group.StaticMembers[instanceID]; exists {
		return group.Members[memberID]
	}
	return nil
}

// FindStaticMemberLocked finds a member by static instance ID (assumes group is already locked)
func (gc *GroupCoordinator) FindStaticMemberLocked(group *ConsumerGroup, instanceID string) *GroupMember {
	if instanceID == "" {
		return nil
	}

	if memberID, exists := group.StaticMembers[instanceID]; exists {
		return group.Members[memberID]
	}
	return nil
}

// RegisterStaticMember registers a static member in the group
func (gc *GroupCoordinator) RegisterStaticMember(group *ConsumerGroup, member *GroupMember) {
	if member.GroupInstanceID == nil || *member.GroupInstanceID == "" {
		return
	}

	group.Mu.Lock()
	defer group.Mu.Unlock()

	group.StaticMembers[*member.GroupInstanceID] = member.ID
}

// RegisterStaticMemberLocked registers a static member in the group (assumes group is already locked)
func (gc *GroupCoordinator) RegisterStaticMemberLocked(group *ConsumerGroup, member *GroupMember) {
	if member.GroupInstanceID == nil || *member.GroupInstanceID == "" {
		return
	}

	group.StaticMembers[*member.GroupInstanceID] = member.ID
}

// UnregisterStaticMember removes a static member from the group
func (gc *GroupCoordinator) UnregisterStaticMember(group *ConsumerGroup, instanceID string) {
	if instanceID == "" {
		return
	}

	group.Mu.Lock()
	defer group.Mu.Unlock()

	delete(group.StaticMembers, instanceID)
}

// UnregisterStaticMemberLocked removes a static member from the group (assumes group is already locked)
func (gc *GroupCoordinator) UnregisterStaticMemberLocked(group *ConsumerGroup, instanceID string) {
	if instanceID == "" {
		return
	}

	delete(group.StaticMembers, instanceID)
}

// IsStaticMember checks if a member is using static membership
func (gc *GroupCoordinator) IsStaticMember(member *GroupMember) bool {
	return member.GroupInstanceID != nil && *member.GroupInstanceID != ""
}

// GenerateMemberID creates a deterministic member ID based on client info
func (gc *GroupCoordinator) GenerateMemberID(clientID, clientHost string) string {
	// EXPERIMENT: Use simpler member ID format like real Kafka brokers
	// Real Kafka uses format like: "consumer-1-uuid" or "consumer-groupId-uuid"
	hash := fmt.Sprintf("%x", sha256.Sum256([]byte(clientID+"-"+clientHost)))
	return fmt.Sprintf("consumer-%s", hash[:16]) // Shorter, simpler format
}

// ValidateSessionTimeout checks if session timeout is within acceptable range
func (gc *GroupCoordinator) ValidateSessionTimeout(timeout int32) bool {
	return timeout >= gc.sessionTimeoutMin && timeout <= gc.sessionTimeoutMax
}

// cleanupRoutine periodically cleans up dead groups and expired members
func (gc *GroupCoordinator) cleanupRoutine() {
	for {
		select {
		case <-gc.cleanupTicker.C:
			gc.performCleanup()
		case <-gc.stopChan:
			return
		}
	}
}

// performCleanup removes expired members and empty groups
func (gc *GroupCoordinator) performCleanup() {
	now := time.Now()

	// Use rebalance timeout manager for more sophisticated timeout handling
	gc.rebalanceTimeoutManager.CheckRebalanceTimeouts()

	gc.groupsMu.Lock()
	defer gc.groupsMu.Unlock()

	for groupID, group := range gc.groups {
		group.Mu.Lock()

		// Check for expired members (session timeout)
		expiredMembers := make([]string, 0)
		for memberID, member := range group.Members {
			sessionDuration := time.Duration(member.SessionTimeout) * time.Millisecond
			timeSinceHeartbeat := now.Sub(member.LastHeartbeat)
			if timeSinceHeartbeat > sessionDuration {
				expiredMembers = append(expiredMembers, memberID)
			}
		}

		// Remove expired members
		for _, memberID := range expiredMembers {
			delete(group.Members, memberID)
			if group.Leader == memberID {
				group.Leader = ""
			}
		}

		// Update group state based on member count
		if len(group.Members) == 0 {
			if group.State != GroupStateEmpty {
				group.State = GroupStateEmpty
				group.Generation++
			}

			// Mark group for deletion if empty for too long (30 minutes)
			if now.Sub(group.LastActivity) > 30*time.Minute {
				group.State = GroupStateDead
			}
		}

		// Check for stuck rebalances and force completion if necessary
		maxRebalanceDuration := 10 * time.Minute // Maximum time allowed for rebalancing
		if gc.rebalanceTimeoutManager.IsRebalanceStuck(group, maxRebalanceDuration) {
			gc.rebalanceTimeoutManager.ForceCompleteRebalance(group)
		}

		group.Mu.Unlock()

		// Remove dead groups
		if group.State == GroupStateDead {
			delete(gc.groups, groupID)
		}
	}
}

// Close shuts down the group coordinator
func (gc *GroupCoordinator) Close() {
	gc.stopOnce.Do(func() {
		close(gc.stopChan)
		if gc.cleanupTicker != nil {
			gc.cleanupTicker.Stop()
		}
	})
}

// GetGroupStats returns statistics about the group coordinator
func (gc *GroupCoordinator) GetGroupStats() map[string]interface{} {
	gc.groupsMu.RLock()
	defer gc.groupsMu.RUnlock()

	stats := map[string]interface{}{
		"total_groups": len(gc.groups),
		"group_states": make(map[string]int),
	}

	stateCount := make(map[GroupState]int)
	totalMembers := 0

	for _, group := range gc.groups {
		group.Mu.RLock()
		stateCount[group.State]++
		totalMembers += len(group.Members)
		group.Mu.RUnlock()
	}

	stats["total_members"] = totalMembers
	for state, count := range stateCount {
		stats["group_states"].(map[string]int)[state.String()] = count
	}

	return stats
}

// GetRebalanceStatus returns the rebalance status for a specific group
func (gc *GroupCoordinator) GetRebalanceStatus(groupID string) *RebalanceStatus {
	return gc.rebalanceTimeoutManager.GetRebalanceStatus(groupID)
}