aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/sub_coordinator/market.go
blob: df07edfd59c19e30e06693682f31fc5391d02a84 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
package sub_coordinator

import (
	"errors"
	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/mq/topic"
	"sync"
	"time"
)

/*
Market is a data structure that keeps track of the state of the consumer group instances and the partitions.

When rebalancing, the market will try to balance the load of the partitions among the consumer group instances.
For each loop, the market will:
* If a consumer group instance has more partitions than the average, it will unassign some partitions.
* If a consumer group instance has less partitions than the average, it will assign some partitions.

Trigger rebalance when:
* A new consumer group instance is added
* Some partitions are unassigned from a consumer group instance.

If multiple rebalance requests are received, after a certain period, the market will only process the latest request.

However, if the number of unassigned partition is increased to exactly the total number of partitions,
and total partitions are less than or equal to the sum of the max partition count of all consumer group instances,
the market will process the request immediately.
This is to ensure a partition can be migrated to another consumer group instance as soon as possible.

Emit these adjustments to the subscriber coordinator:
* Assign a partition to a consumer group instance
* Unassign a partition from a consumer group instance

Because the adjustment is sent to the subscriber coordinator, the market will keep track of the inflight adjustments.
The subscriber coordinator will send back the response to the market when the adjustment is processed.
If the adjustment is older than a certain time(inflightAdjustmentTTL), it would be considered expired.
Otherwise, the adjustment is considered inflight, so it would be used when calculating the load.

Later features:
* A consumer group instance is not keeping up with the load.

Since a coordinator, and thus the market, may be restarted or moved to another node, the market should be able to recover the state from the subscriber coordinator.
The subscriber coordinator should be able to send the current state of the consumer group instances and the partitions to the market.

*/

type PartitionSlot struct {
	Partition  topic.Partition
	AssignedTo *ConsumerGroupInstance // Track the consumer assigned to this partition slot
}

type Adjustment struct {
	isAssign  bool
	partition topic.Partition
	consumer  ConsumerGroupInstanceId
	ts        time.Time
}

type Market struct {
	mu                    sync.Mutex
	partitions            map[topic.Partition]*PartitionSlot
	consumerInstances     map[ConsumerGroupInstanceId]*ConsumerGroupInstance
	AdjustmentChan        chan *Adjustment
	inflightAdjustments   []*Adjustment
	inflightAdjustmentTTL time.Duration
	lastBalancedTime      time.Time
	stopChan              chan struct{}
	balanceRequestChan    chan struct{}
	hasBalanceRequest     bool
}

func NewMarket(partitions []topic.Partition, inflightAdjustmentTTL time.Duration) *Market {
	partitionMap := make(map[topic.Partition]*PartitionSlot)
	for _, partition := range partitions {
		partitionMap[partition] = &PartitionSlot{
			Partition: partition,
		}
	}
	m := &Market{
		partitions:            partitionMap,
		consumerInstances:     make(map[ConsumerGroupInstanceId]*ConsumerGroupInstance),
		AdjustmentChan:        make(chan *Adjustment, 100),
		inflightAdjustmentTTL: inflightAdjustmentTTL,
		stopChan:              make(chan struct{}),
		balanceRequestChan:    make(chan struct{}),
	}
	m.lastBalancedTime = time.Now()
	go m.loopBalanceLoad()

	return m
}

func (m *Market) ShutdownMarket() {
	close(m.stopChan)
	close(m.AdjustmentChan)
}

func (m *Market) AddConsumerInstance(consumer *ConsumerGroupInstance) error {
	m.mu.Lock()
	defer m.mu.Unlock()

	if _, exists := m.consumerInstances[consumer.InstanceId]; exists {
		return errors.New("consumer instance already exists")
	}

	m.consumerInstances[consumer.InstanceId] = consumer
	m.balanceRequestChan <- struct{}{}

	return nil
}

func (m *Market) RemoveConsumerInstance(consumerId ConsumerGroupInstanceId) error {
	m.mu.Lock()
	defer m.mu.Unlock()

	consumer, exists := m.consumerInstances[consumerId]
	if !exists {
		return nil
	}
	delete(m.consumerInstances, consumerId)

	for _, partition := range consumer.AssignedPartitions {
		if partitionSlot, exists := m.partitions[partition]; exists {
			partitionSlot.AssignedTo = nil
		}
	}
	m.balanceRequestChan <- struct{}{}

	return nil
}

func (m *Market) assignPartitionToConsumer(partition *PartitionSlot) {
	var bestConsumer *ConsumerGroupInstance
	var minLoad = int(^uint(0) >> 1) // Max int value

	inflightConsumerAdjustments := make(map[ConsumerGroupInstanceId]int)
	for _, adjustment := range m.inflightAdjustments {
		if adjustment.isAssign {
			inflightConsumerAdjustments[adjustment.consumer]++
		} else {
			inflightConsumerAdjustments[adjustment.consumer]--
		}
	}
	for _, consumer := range m.consumerInstances {
		consumerLoad := len(consumer.AssignedPartitions)
		if inflightAdjustments, exists := inflightConsumerAdjustments[consumer.InstanceId]; exists {
			consumerLoad += inflightAdjustments
		}
		// fmt.Printf("Consumer %+v has load %d, max %d, min %d\n", consumer.InstanceId, consumerLoad, consumer.MaxPartitionCount, minLoad)
		if consumerLoad < int(consumer.MaxPartitionCount) {
			if consumerLoad < minLoad {
				bestConsumer = consumer
				minLoad = consumerLoad
				// fmt.Printf("picked: Consumer %+v has load %d, max %d, min %d\n", consumer.InstanceId, consumerLoad, consumer.MaxPartitionCount, minLoad)
			}
		}
	}

	if bestConsumer != nil {
		// change consumer assigned partitions later when the adjustment is confirmed
		adjustment := &Adjustment{
			isAssign:  true,
			partition: partition.Partition,
			consumer:  bestConsumer.InstanceId,
			ts:        time.Now(),
		}
		m.AdjustmentChan <- adjustment
		m.inflightAdjustments = append(m.inflightAdjustments, adjustment)
		m.lastBalancedTime = adjustment.ts
	}
}

func (m *Market) loopBalanceLoad() {
	ticker := time.NewTicker(500 * time.Millisecond)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			if m.hasBalanceRequest {
				m.hasBalanceRequest = false
				inflightAdjustments := make([]*Adjustment, 0, len(m.inflightAdjustments))
				for _, adjustment := range m.inflightAdjustments {
					if adjustment.ts.Add(m.inflightAdjustmentTTL).After(time.Now()) {
						inflightAdjustments = append(inflightAdjustments, adjustment)
					}
				}
				m.inflightAdjustments = inflightAdjustments

				m.doBalanceLoad()
				// println("Balance load completed.")
				m.Status()
			}
		case <-m.balanceRequestChan:
			m.hasBalanceRequest = true
		case <-m.stopChan:
			return
		}
	}
}

// doBalanceLoad will balance the load of the partitions among the consumer group instances.
// It will try to unassign partitions from the consumer group instances that have more partitions than the average.
// It will try to assign partitions to the consumer group instances that have less partitions than the average.
func (m *Market) doBalanceLoad() {
	if len(m.consumerInstances) == 0 {
		return
	}

	// find the average load for all consumers
	averageLoad := m.findAverageLoad()

	// find the consumers with the higher load than average
	if m.adjustBusyConsumers(averageLoad) {
		return
	}

	// find partitions with no consumer assigned
	m.adjustUnassignedPartitions()
}
func (m *Market) findAverageLoad() (averageLoad float32) {
	var totalLoad int
	for _, consumer := range m.consumerInstances {
		totalLoad += len(consumer.AssignedPartitions)
	}
	for _, adjustment := range m.inflightAdjustments {
		if adjustment.isAssign {
			totalLoad++
		} else {
			totalLoad--
		}
	}
	averageLoad = float32(totalLoad) / float32(len(m.consumerInstances))
	return
}

func (m *Market) adjustBusyConsumers(averageLoad float32) (hasAdjustments bool) {
	inflightConsumerAdjustments := make(map[ConsumerGroupInstanceId]int)
	for _, adjustment := range m.inflightAdjustments {
		if adjustment.isAssign {
			inflightConsumerAdjustments[adjustment.consumer]++
		} else {
			inflightConsumerAdjustments[adjustment.consumer]--
		}
	}
	for _, consumer := range m.consumerInstances {
		consumerLoad := len(consumer.AssignedPartitions)
		if inflightAdjustment, exists := inflightConsumerAdjustments[consumer.InstanceId]; exists {
			consumerLoad += inflightAdjustment
		}
		delta := int(float32(consumerLoad) - averageLoad)
		if delta <= 0 {
			continue
		}
		adjustTime := time.Now()
		for i := 0; i < delta; i++ {
			adjustment := &Adjustment{
				isAssign:  false,
				partition: consumer.AssignedPartitions[i],
				consumer:  consumer.InstanceId,
				ts:        adjustTime,
			}
			m.AdjustmentChan <- adjustment
			m.inflightAdjustments = append(m.inflightAdjustments, adjustment)
			m.lastBalancedTime = adjustment.ts
		}
		hasAdjustments = true
	}
	return
}

func (m *Market) adjustUnassignedPartitions() {
	inflightPartitionAdjustments := make(map[topic.Partition]bool)
	for _, adjustment := range m.inflightAdjustments {
		inflightPartitionAdjustments[adjustment.partition] = true
	}
	for _, partitionSlot := range m.partitions {
		if partitionSlot.AssignedTo == nil {
			if _, exists := inflightPartitionAdjustments[partitionSlot.Partition]; exists {
				continue
			}
			// fmt.Printf("Assigning partition %+v to consumer\n", partitionSlot.Partition)
			m.assignPartitionToConsumer(partitionSlot)
		}
	}
}

func (m *Market) ConfirmAdjustment(adjustment *Adjustment) {
	if adjustment.isAssign {
		m.confirmAssignPartition(adjustment.partition, adjustment.consumer)
	} else {
		m.unassignPartitionSlot(adjustment.partition)
	}
	glog.V(1).Infof("ConfirmAdjustment %+v", adjustment)
	m.Status()
}

func (m *Market) unassignPartitionSlot(partition topic.Partition) {
	m.mu.Lock()
	defer m.mu.Unlock()

	partitionSlot, exists := m.partitions[partition]
	if !exists {
		glog.V(0).Infof("partition %+v slot is not tracked", partition)
		return
	}

	if partitionSlot.AssignedTo == nil {
		glog.V(0).Infof("partition %+v slot is not assigned to any consumer", partition)
		return
	}

	consumer := partitionSlot.AssignedTo
	for i, p := range consumer.AssignedPartitions {
		if p == partition {
			consumer.AssignedPartitions = append(consumer.AssignedPartitions[:i], consumer.AssignedPartitions[i+1:]...)
			partitionSlot.AssignedTo = nil
			m.balanceRequestChan <- struct{}{}
			return
		}
	}

	glog.V(0).Infof("partition %+v slot not found in assigned consumer", partition)

}

func (m *Market) confirmAssignPartition(partition topic.Partition, consumerInstanceId ConsumerGroupInstanceId) {
	m.mu.Lock()
	defer m.mu.Unlock()

	partitionSlot, exists := m.partitions[partition]
	if !exists {
		glog.V(0).Infof("partition %+v slot is not tracked", partition)
		return
	}

	if partitionSlot.AssignedTo != nil {
		glog.V(0).Infof("partition %+v slot is already assigned to %+v", partition, partitionSlot.AssignedTo.InstanceId)
		return
	}

	consumerInstance, exists := m.consumerInstances[consumerInstanceId]
	if !exists {
		glog.V(0).Infof("consumer %+v is not tracked", consumerInstanceId)
		return
	}

	partitionSlot.AssignedTo = consumerInstance
	consumerInstance.AssignedPartitions = append(consumerInstance.AssignedPartitions, partition)

}

func (m *Market) Status() {
	m.mu.Lock()
	defer m.mu.Unlock()

	glog.V(1).Infof("Market has %d partitions and %d consumer instances", len(m.partitions), len(m.consumerInstances))
	for partition, slot := range m.partitions {
		if slot.AssignedTo == nil {
			glog.V(1).Infof("Partition %+v is not assigned to any consumer", partition)
		} else {
			glog.V(1).Infof("Partition %+v is assigned to consumer %+v", partition, slot.AssignedTo.InstanceId)
		}
	}
	for _, consumer := range m.consumerInstances {
		glog.V(1).Infof("Consumer %+v has %d partitions", consumer.InstanceId, len(consumer.AssignedPartitions))
	}
}