aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/sub_coordinator/consumer_group.go6
-rw-r--r--weed/mq/sub_coordinator/partition_consumer_mapping.go28
-rw-r--r--weed/mq/sub_coordinator/partition_consumer_mapping_test.go90
3 files changed, 98 insertions, 26 deletions
diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go
index 882e7ddf9..020f459b6 100644
--- a/weed/mq/sub_coordinator/consumer_group.go
+++ b/weed/mq/sub_coordinator/consumer_group.go
@@ -84,12 +84,12 @@ func (cg *ConsumerGroup) BalanceConsumerGroupInstances(knownPartitionSlotToBroke
}
// collect current consumer group instance ids
- var consumerInstanceIds []string
+ var consumerInstances []*ConsumerGroupInstance
for _, consumerGroupInstance := range cg.ConsumerGroupInstances.Items() {
- consumerInstanceIds = append(consumerInstanceIds, consumerGroupInstance.InstanceId)
+ consumerInstances = append(consumerInstances, consumerGroupInstance)
}
- cg.mapping.BalanceToConsumerInstanceIds(partitionSlotToBrokerList, consumerInstanceIds)
+ cg.mapping.BalanceToConsumerInstances(partitionSlotToBrokerList, consumerInstances)
// convert cg.mapping currentMapping to map of consumer group instance id to partition slots
consumerInstanceToPartitionSlots := make(map[string][]*PartitionSlotToConsumerInstance)
diff --git a/weed/mq/sub_coordinator/partition_consumer_mapping.go b/weed/mq/sub_coordinator/partition_consumer_mapping.go
index c7f104af1..c5c4f6866 100644
--- a/weed/mq/sub_coordinator/partition_consumer_mapping.go
+++ b/weed/mq/sub_coordinator/partition_consumer_mapping.go
@@ -23,8 +23,8 @@ func NewPartitionConsumerMapping(ringSize int32) *PartitionConsumerMapping {
// 2. allow one consumer instance to be down unexpectedly
// without affecting the processing power utilization
-func (pcm *PartitionConsumerMapping) BalanceToConsumerInstanceIds(partitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, consumerInstanceIds []string) {
- if len(partitionSlotToBrokerList.PartitionSlots) == 0 || len(consumerInstanceIds) == 0 {
+func (pcm *PartitionConsumerMapping) BalanceToConsumerInstances(partitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, consumerInstances []*ConsumerGroupInstance) {
+ if len(partitionSlotToBrokerList.PartitionSlots) == 0 || len(consumerInstances) == 0 {
return
}
newVersion := time.Now().UnixNano()
@@ -35,7 +35,7 @@ func (pcm *PartitionConsumerMapping) BalanceToConsumerInstanceIds(partitionSlotT
} else {
prevMapping = nil
}
- newMapping.PartitionSlots = doBalanceSticky(partitionSlotToBrokerList.PartitionSlots, consumerInstanceIds, prevMapping)
+ newMapping.PartitionSlots = doBalanceSticky(partitionSlotToBrokerList.PartitionSlots, consumerInstances, prevMapping)
if pcm.currentMapping != nil {
pcm.prevMappings = append(pcm.prevMappings, pcm.currentMapping)
if len(pcm.prevMappings) > 10 {
@@ -45,7 +45,7 @@ func (pcm *PartitionConsumerMapping) BalanceToConsumerInstanceIds(partitionSlotT
pcm.currentMapping = newMapping
}
-func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerInstanceIds []string, prevMapping *PartitionSlotToConsumerInstanceList) (partitionSlots []*PartitionSlotToConsumerInstance) {
+func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerInstances []*ConsumerGroupInstance, prevMapping *PartitionSlotToConsumerInstanceList) (partitionSlots []*PartitionSlotToConsumerInstance) {
// collect previous consumer instance ids
prevConsumerInstanceIds := make(map[string]struct{})
if prevMapping != nil {
@@ -57,8 +57,8 @@ func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerI
}
// collect current consumer instance ids
currConsumerInstanceIds := make(map[string]struct{})
- for _, consumerInstanceId := range consumerInstanceIds {
- currConsumerInstanceIds[consumerInstanceId] = struct{}{}
+ for _, consumerInstance := range consumerInstances {
+ currConsumerInstanceIds[consumerInstance.InstanceId] = struct{}{}
}
// check deleted consumer instances
@@ -106,25 +106,25 @@ func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerI
}
}
// average number of partitions that are assigned to each consumer instance
- averageConsumerInstanceLoad := float32(len(partitions)) / float32(len(consumerInstanceIds))
+ averageConsumerInstanceLoad := float32(len(partitions)) / float32(len(consumerInstances))
// assign unassigned partition slots to consumer instances that is underloaded
consumerInstanceIdsIndex := 0
for _, newPartitionSlot := range newPartitionSlots {
if newPartitionSlot.AssignedInstanceId == "" {
- for avoidDeadLoop := len(consumerInstanceIds); avoidDeadLoop > 0; avoidDeadLoop-- {
- consumerInstanceId := consumerInstanceIds[consumerInstanceIdsIndex]
- if float32(consumerInstancePartitionCount[consumerInstanceId]) < averageConsumerInstanceLoad {
- newPartitionSlot.AssignedInstanceId = consumerInstanceId
- consumerInstancePartitionCount[consumerInstanceId]++
+ for avoidDeadLoop := len(consumerInstances); avoidDeadLoop > 0; avoidDeadLoop-- {
+ consumerInstance := consumerInstances[consumerInstanceIdsIndex]
+ if float32(consumerInstancePartitionCount[consumerInstance.InstanceId]) < averageConsumerInstanceLoad {
+ newPartitionSlot.AssignedInstanceId = consumerInstance.InstanceId
+ consumerInstancePartitionCount[consumerInstance.InstanceId]++
consumerInstanceIdsIndex++
- if consumerInstanceIdsIndex >= len(consumerInstanceIds) {
+ if consumerInstanceIdsIndex >= len(consumerInstances) {
consumerInstanceIdsIndex = 0
}
break
} else {
consumerInstanceIdsIndex++
- if consumerInstanceIdsIndex >= len(consumerInstanceIds) {
+ if consumerInstanceIdsIndex >= len(consumerInstances) {
consumerInstanceIdsIndex = 0
}
}
diff --git a/weed/mq/sub_coordinator/partition_consumer_mapping_test.go b/weed/mq/sub_coordinator/partition_consumer_mapping_test.go
index 9a9abe011..7dcfc6f9b 100644
--- a/weed/mq/sub_coordinator/partition_consumer_mapping_test.go
+++ b/weed/mq/sub_coordinator/partition_consumer_mapping_test.go
@@ -9,7 +9,7 @@ import (
func Test_doBalanceSticky(t *testing.T) {
type args struct {
partitions []*pub_balancer.PartitionSlotToBroker
- consumerInstanceIds []string
+ consumerInstanceIds []*ConsumerGroupInstance
prevMapping *PartitionSlotToConsumerInstanceList
}
tests := []struct {
@@ -26,7 +26,12 @@ func Test_doBalanceSticky(t *testing.T) {
RangeStop: 100,
},
},
- consumerInstanceIds: []string{"consumer-instance-1"},
+ consumerInstanceIds: []*ConsumerGroupInstance{
+ {
+ InstanceId: "consumer-instance-1",
+ MaxPartitionCount: 1,
+ },
+ },
prevMapping: nil,
},
wantPartitionSlots: []*PartitionSlotToConsumerInstance{
@@ -46,7 +51,16 @@ func Test_doBalanceSticky(t *testing.T) {
RangeStop: 100,
},
},
- consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2"},
+ consumerInstanceIds: []*ConsumerGroupInstance{
+ {
+ InstanceId: "consumer-instance-1",
+ MaxPartitionCount: 1,
+ },
+ {
+ InstanceId: "consumer-instance-2",
+ MaxPartitionCount: 1,
+ },
+ },
prevMapping: nil,
},
wantPartitionSlots: []*PartitionSlotToConsumerInstance{
@@ -70,7 +84,12 @@ func Test_doBalanceSticky(t *testing.T) {
RangeStop: 100,
},
},
- consumerInstanceIds: []string{"consumer-instance-1"},
+ consumerInstanceIds: []*ConsumerGroupInstance{
+ {
+ InstanceId: "consumer-instance-1",
+ MaxPartitionCount: 1,
+ },
+ },
prevMapping: nil,
},
wantPartitionSlots: []*PartitionSlotToConsumerInstance{
@@ -99,7 +118,16 @@ func Test_doBalanceSticky(t *testing.T) {
RangeStop: 100,
},
},
- consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2"},
+ consumerInstanceIds: []*ConsumerGroupInstance{
+ {
+ InstanceId: "consumer-instance-1",
+ MaxPartitionCount: 1,
+ },
+ {
+ InstanceId: "consumer-instance-2",
+ MaxPartitionCount: 1,
+ },
+ },
prevMapping: nil,
},
wantPartitionSlots: []*PartitionSlotToConsumerInstance{
@@ -128,7 +156,16 @@ func Test_doBalanceSticky(t *testing.T) {
RangeStop: 100,
},
},
- consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2"},
+ consumerInstanceIds: []*ConsumerGroupInstance{
+ {
+ InstanceId: "consumer-instance-1",
+ MaxPartitionCount: 1,
+ },
+ {
+ InstanceId: "consumer-instance-2",
+ MaxPartitionCount: 1,
+ },
+ },
prevMapping: &PartitionSlotToConsumerInstanceList{
PartitionSlots: []*PartitionSlotToConsumerInstance{
{
@@ -170,7 +207,20 @@ func Test_doBalanceSticky(t *testing.T) {
RangeStop: 100,
},
},
- consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2", "consumer-instance-3"},
+ consumerInstanceIds: []*ConsumerGroupInstance{
+ {
+ InstanceId: "consumer-instance-1",
+ MaxPartitionCount: 1,
+ },
+ {
+ InstanceId: "consumer-instance-2",
+ MaxPartitionCount: 1,
+ },
+ {
+ InstanceId: "consumer-instance-3",
+ MaxPartitionCount: 1,
+ },
+ },
prevMapping: &PartitionSlotToConsumerInstanceList{
PartitionSlots: []*PartitionSlotToConsumerInstance{
{
@@ -216,7 +266,16 @@ func Test_doBalanceSticky(t *testing.T) {
RangeStop: 150,
},
},
- consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2"},
+ consumerInstanceIds: []*ConsumerGroupInstance{
+ {
+ InstanceId: "consumer-instance-1",
+ MaxPartitionCount: 1,
+ },
+ {
+ InstanceId: "consumer-instance-2",
+ MaxPartitionCount: 1,
+ },
+ },
prevMapping: &PartitionSlotToConsumerInstanceList{
PartitionSlots: []*PartitionSlotToConsumerInstance{
{
@@ -267,7 +326,20 @@ func Test_doBalanceSticky(t *testing.T) {
RangeStop: 150,
},
},
- consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2", "consumer-instance-3"},
+ consumerInstanceIds: []*ConsumerGroupInstance{
+ {
+ InstanceId: "consumer-instance-1",
+ MaxPartitionCount: 1,
+ },
+ {
+ InstanceId: "consumer-instance-2",
+ MaxPartitionCount: 1,
+ },
+ {
+ InstanceId: "consumer-instance-3",
+ MaxPartitionCount: 1,
+ },
+ },
prevMapping: &PartitionSlotToConsumerInstanceList{
PartitionSlots: []*PartitionSlotToConsumerInstance{
{