aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/sub_coordinator/market.go346
-rw-r--r--weed/mq/sub_coordinator/market_test.go103
2 files changed, 449 insertions, 0 deletions
diff --git a/weed/mq/sub_coordinator/market.go b/weed/mq/sub_coordinator/market.go
new file mode 100644
index 000000000..48b9ae649
--- /dev/null
+++ b/weed/mq/sub_coordinator/market.go
@@ -0,0 +1,346 @@
+package sub_coordinator
+
+import (
+ "errors"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "sync"
+ "time"
+)
+
+/*
+Market is a data structure that keeps track of the state of the consumer group instances and the partitions.
+
+When rebalancing, the market will try to balance the load of the partitions among the consumer group instances.
+For each loop, the market will:
+* If a consumer group instance has more partitions than the average, it will unassign some partitions.
+* If a consumer group instance has less partitions than the average, it will assign some partitions.
+
+Trigger rebalance when:
+* A new consumer group instance is added
+* Some partitions are unassigned from a consumer group instance.
+
+If multiple reblance requests are received, after a certain period, the market will only process the latest request.
+
+However, if the number of unassigned partition is increased to exactly the total number of partitions,
+and total partitions are less than or equal to the sum of the max partition count of all consumer group instances,
+the market will process the request immediately.
+This is to ensure a partition can be migrated to another consumer group instance as soon as possible.
+
+Emit these adjustments to the subscriber coordinator:
+* Assign a partition to a consumer group instance
+* Unassign a partition from a consumer group instance
+
+Because the adjustment is sent to the subscriber coordinator, the market will keep track of the inflight adjustments.
+The subscriber coordinator will send back the response to the market when the adjustment is processed.
+If the adjustment is older than a certain time(inflightAdjustmentTTL), it would be considered expired.
+Otherwise, the adjustment is considered inflight, so it would be used when calculating the load.
+
+Later features:
+* A consumer group instance is not keeping up with the load.
+
+Since a coordinator, and thus the market, may be restarted or moved to another node, the market should be able to recover the state from the subscriber coordinator.
+The subscriber coordinator should be able to send the current state of the consumer group instances and the partitions to the market.
+
+*/
+
+type PartitionSlot struct {
+ Partition topic.Partition
+ AssignedTo *ConsumerGroupInstance // Track the consumer assigned to this partition slot
+}
+
+type Adjustment struct {
+ isAssign bool
+ partition topic.Partition
+ consumer ConsumerGroupInstanceId
+ ts time.Time
+}
+
+type Market struct {
+ mu sync.Mutex
+ partitions map[topic.Partition]*PartitionSlot
+ consumerInstances map[ConsumerGroupInstanceId]*ConsumerGroupInstance
+ AdjustmentChan chan *Adjustment
+ inflightAdjustments []*Adjustment
+ inflightAdjustmentTTL time.Duration
+ lastBalancedTime time.Time
+ stopChan chan struct{}
+ balanceRequestChan chan struct{}
+ hasBalanceRequest bool
+}
+
+func NewMarket(partitions []topic.Partition, inflightAdjustmentTTL time.Duration) *Market {
+ partitionMap := make(map[topic.Partition]*PartitionSlot)
+ for _, partition := range partitions {
+ partitionMap[partition] = &PartitionSlot{
+ Partition: partition,
+ }
+ }
+ m := &Market{
+ partitions: partitionMap,
+ consumerInstances: make(map[ConsumerGroupInstanceId]*ConsumerGroupInstance),
+ AdjustmentChan: make(chan *Adjustment, 100),
+ inflightAdjustmentTTL: inflightAdjustmentTTL,
+ stopChan: make(chan struct{}),
+ balanceRequestChan: make(chan struct{}),
+ }
+ m.lastBalancedTime = time.Now()
+ go m.loopBalanceLoad()
+
+ return m
+}
+
+func (m *Market) ShutdownMarket() {
+ close(m.stopChan)
+ close(m.AdjustmentChan)
+}
+
+func (m *Market) AddConsumerInstance(consumer *ConsumerGroupInstance) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ if _, exists := m.consumerInstances[consumer.InstanceId]; exists {
+ return errors.New("consumer instance already exists")
+ }
+
+ m.consumerInstances[consumer.InstanceId] = consumer
+ m.balanceRequestChan <- struct{}{}
+
+ return nil
+}
+
+func (m *Market) RemoveConsumerInstance(consumerId ConsumerGroupInstanceId) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ consumer, exists := m.consumerInstances[consumerId]
+ if !exists {
+ return nil
+ }
+ delete(m.consumerInstances, consumerId)
+
+ for _, partition := range consumer.AssignedPartitions {
+ if partitionSlot, exists := m.partitions[partition]; exists {
+ partitionSlot.AssignedTo = nil
+ }
+ }
+ m.balanceRequestChan <- struct{}{}
+
+ return nil
+}
+
+func (m *Market) assignPartitionToConsumer(partition *PartitionSlot) {
+ var bestConsumer *ConsumerGroupInstance
+ var minLoad = int(^uint(0) >> 1) // Max int value
+
+ inflightConsumerAdjustments := make(map[ConsumerGroupInstanceId]int)
+ for _, adjustment := range m.inflightAdjustments {
+ if adjustment.isAssign {
+ inflightConsumerAdjustments[adjustment.consumer]++
+ } else {
+ inflightConsumerAdjustments[adjustment.consumer]--
+ }
+ }
+ for _, consumer := range m.consumerInstances {
+ consumerLoad := len(consumer.AssignedPartitions)
+ if inflightAdjustments, exists := inflightConsumerAdjustments[consumer.InstanceId]; exists {
+ consumerLoad += inflightAdjustments
+ }
+ fmt.Printf("Consumer %+v has load %d, max %d, min %d\n", consumer.InstanceId, consumerLoad, consumer.MaxPartitionCount, minLoad)
+ if consumerLoad < int(consumer.MaxPartitionCount) {
+ if consumerLoad < minLoad {
+ bestConsumer = consumer
+ minLoad = consumerLoad
+ fmt.Printf("picked: Consumer %+v has load %d, max %d, min %d\n", consumer.InstanceId, consumerLoad, consumer.MaxPartitionCount, minLoad)
+ }
+ }
+ }
+
+ if bestConsumer != nil {
+ adjustment := &Adjustment{
+ isAssign: true,
+ partition: partition.Partition,
+ consumer: bestConsumer.InstanceId,
+ ts: time.Now(),
+ }
+ m.AdjustmentChan <- adjustment
+ m.inflightAdjustments = append(m.inflightAdjustments, adjustment)
+ m.lastBalancedTime = adjustment.ts
+ }
+}
+
+func (m *Market) loopBalanceLoad() {
+ ticker := time.NewTicker(500 * time.Millisecond)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ if m.hasBalanceRequest {
+ m.hasBalanceRequest = false
+ inflightAdjustments := make([]*Adjustment, 0, len(m.inflightAdjustments))
+ for _, adjustment := range m.inflightAdjustments {
+ if adjustment.ts.Add(m.inflightAdjustmentTTL).After(time.Now()) {
+ inflightAdjustments = append(inflightAdjustments, adjustment)
+ }
+ }
+ m.inflightAdjustments = inflightAdjustments
+
+ m.doBalanceLoad()
+ }
+ case <-m.balanceRequestChan:
+ m.hasBalanceRequest = true
+ case <-m.stopChan:
+ return
+ }
+ }
+}
+
+// doBalanceLoad will balance the load of the partitions among the consumer group instances.
+// It will try to unassign partitions from the consumer group instances that have more partitions than the average.
+// It will try to assign partitions to the consumer group instances that have less partitions than the average.
+func (m *Market) doBalanceLoad() {
+ if len(m.consumerInstances) == 0 {
+ return
+ }
+
+ // find the average load for all consumers
+ averageLoad := m.findAverageLoad()
+
+ // find the consumers with the higher load than average
+ if m.adjustBusyConsumers(averageLoad) {
+ return
+ }
+
+ // find partitions with no consumer assigned
+ m.adjustUnassignedPartitions()
+}
+func (m *Market) findAverageLoad() (averageLoad float32) {
+ var totalLoad int
+ for _, consumer := range m.consumerInstances {
+ totalLoad += len(consumer.AssignedPartitions)
+ }
+ for _, adjustment := range m.inflightAdjustments {
+ if adjustment.isAssign {
+ totalLoad++
+ } else {
+ totalLoad--
+ }
+ }
+ averageLoad = float32(totalLoad) / float32(len(m.consumerInstances))
+ return
+}
+
+func (m *Market) adjustBusyConsumers(averageLoad float32) (hasAdjustments bool) {
+ inflightConsumerAdjustments := make(map[ConsumerGroupInstanceId]int)
+ for _, adjustment := range m.inflightAdjustments {
+ if adjustment.isAssign {
+ inflightConsumerAdjustments[adjustment.consumer]++
+ } else {
+ inflightConsumerAdjustments[adjustment.consumer]--
+ }
+ }
+ for _, consumer := range m.consumerInstances {
+ consumerLoad := len(consumer.AssignedPartitions)
+ if inflightAdjustment, exists := inflightConsumerAdjustments[consumer.InstanceId]; exists {
+ consumerLoad += inflightAdjustment
+ }
+ delta := int(float32(consumerLoad) - averageLoad)
+ if delta <= 0 {
+ continue
+ }
+ adjustTime := time.Now()
+ for i := 0; i < delta; i++ {
+ adjustment := &Adjustment{
+ isAssign: false,
+ partition: consumer.AssignedPartitions[i],
+ consumer: consumer.InstanceId,
+ ts: adjustTime,
+ }
+ m.AdjustmentChan <- adjustment
+ m.inflightAdjustments = append(m.inflightAdjustments, adjustment)
+ m.lastBalancedTime = adjustment.ts
+ }
+ hasAdjustments = true
+ }
+ return
+}
+
+func (m *Market) adjustUnassignedPartitions() {
+ inflightPartitionAdjustments := make(map[topic.Partition]bool)
+ for _, adjustment := range m.inflightAdjustments {
+ inflightPartitionAdjustments[adjustment.partition] = true
+ }
+ for _, partitionSlot := range m.partitions {
+ if partitionSlot.AssignedTo == nil {
+ if _, exists := inflightPartitionAdjustments[partitionSlot.Partition]; exists {
+ continue
+ }
+ fmt.Printf("Assigning partition %+v to consumer\n", partitionSlot.Partition)
+ m.assignPartitionToConsumer(partitionSlot)
+ }
+ }
+}
+
+func (m *Market) ConfirmAdjustment(adjustment *Adjustment) {
+ if adjustment.isAssign {
+ m.confirmAssignPartition(adjustment.partition, adjustment.consumer)
+ } else {
+ m.unassignPartitionSlot(adjustment.partition)
+ }
+}
+
+func (m *Market) unassignPartitionSlot(partition topic.Partition) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ partitionSlot, exists := m.partitions[partition]
+ if !exists {
+ glog.V(0).Infof("partition %+v slot is not tracked", partition)
+ return
+ }
+
+ if partitionSlot.AssignedTo == nil {
+ glog.V(0).Infof("partition %+v slot is not assigned to any consumer", partition)
+ return
+ }
+
+ consumer := partitionSlot.AssignedTo
+ for i, p := range consumer.AssignedPartitions {
+ if p == partition {
+ consumer.AssignedPartitions = append(consumer.AssignedPartitions[:i], consumer.AssignedPartitions[i+1:]...)
+ partitionSlot.AssignedTo = nil
+ m.balanceRequestChan <- struct{}{}
+ return
+ }
+ }
+
+ glog.V(0).Infof("partition %+v slot not found in assigned consumer", partition)
+
+}
+
+func (m *Market) confirmAssignPartition(partition topic.Partition, consumerInstanceId ConsumerGroupInstanceId) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ partitionSlot, exists := m.partitions[partition]
+ if !exists {
+ glog.V(0).Infof("partition %+v slot is not tracked", partition)
+ return
+ }
+
+ if partitionSlot.AssignedTo != nil {
+ glog.V(0).Infof("partition %+v slot is already assigned to %+v", partition, partitionSlot.AssignedTo.InstanceId)
+ return
+ }
+
+ consumerInstance, exists := m.consumerInstances[consumerInstanceId]
+ if !exists {
+ glog.V(0).Infof("consumer %+v is not tracked", consumerInstanceId)
+ return
+ }
+
+ partitionSlot.AssignedTo = consumerInstance
+ consumerInstance.AssignedPartitions = append(consumerInstance.AssignedPartitions, partition)
+
+}
diff --git a/weed/mq/sub_coordinator/market_test.go b/weed/mq/sub_coordinator/market_test.go
new file mode 100644
index 000000000..150a88a8d
--- /dev/null
+++ b/weed/mq/sub_coordinator/market_test.go
@@ -0,0 +1,103 @@
+package sub_coordinator
+
+import (
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/stretchr/testify/assert"
+)
+
+var partitions = []topic.Partition{
+ {
+ RangeStart: 0,
+ RangeStop: 1,
+ RingSize: 3,
+ UnixTimeNs: 0,
+ },
+ {
+ RangeStart: 1,
+ RangeStop: 2,
+ RingSize: 3,
+ UnixTimeNs: 0,
+ },
+ {
+ RangeStart: 2,
+ RangeStop: 3,
+ RingSize: 3,
+ UnixTimeNs: 0,
+ },
+}
+
+func TestAddConsumerInstance(t *testing.T) {
+ market := NewMarket(partitions, 10*time.Second)
+
+ consumer := &ConsumerGroupInstance{
+ InstanceId: "first",
+ MaxPartitionCount: 2,
+ }
+ err := market.AddConsumerInstance(consumer)
+
+ assert.Nil(t, err)
+ time.Sleep(1 * time.Second) // Allow time for background rebalancing
+ market.ShutdownMarket()
+ for adjustment := range market.AdjustmentChan {
+ fmt.Printf("%+v\n", adjustment)
+ }
+}
+
+func TestMultipleConsumerInstances(t *testing.T) {
+ market := NewMarket(partitions, 10*time.Second)
+
+ market.AddConsumerInstance(&ConsumerGroupInstance{
+ InstanceId: "first",
+ MaxPartitionCount: 2,
+ })
+ market.AddConsumerInstance(&ConsumerGroupInstance{
+ InstanceId: "second",
+ MaxPartitionCount: 2,
+ })
+ market.AddConsumerInstance(&ConsumerGroupInstance{
+ InstanceId: "third",
+ MaxPartitionCount: 2,
+ })
+
+ time.Sleep(1 * time.Second) // Allow time for background rebalancing
+ market.ShutdownMarket()
+ for adjustment := range market.AdjustmentChan {
+ fmt.Printf("%+v\n", adjustment)
+ }
+}
+
+func TestConfirmAdjustment(t *testing.T) {
+ market := NewMarket(partitions, 1*time.Second)
+
+ market.AddConsumerInstance(&ConsumerGroupInstance{
+ InstanceId: "first",
+ MaxPartitionCount: 2,
+ })
+ market.AddConsumerInstance(&ConsumerGroupInstance{
+ InstanceId: "second",
+ MaxPartitionCount: 2,
+ })
+ market.AddConsumerInstance(&ConsumerGroupInstance{
+ InstanceId: "third",
+ MaxPartitionCount: 2,
+ })
+
+ go func() {
+ time.Sleep(5 * time.Second) // Allow time for background rebalancing
+ market.ShutdownMarket()
+ }()
+ go func() {
+ time.Sleep(2 * time.Second)
+ market.RemoveConsumerInstance("third")
+ }()
+
+ for adjustment := range market.AdjustmentChan {
+ fmt.Printf("%+v\n", adjustment)
+ market.ConfirmAdjustment(adjustment)
+ }
+
+}