aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2023-12-28 21:13:49 -0800
committerchrislu <chris.lu@gmail.com>2023-12-28 21:13:49 -0800
commit28452303298ab84696521132fae5db9fc3d5b478 (patch)
treea492951e32d2f325dc2c7112056596d63950c00e
parent093fdc16213b64b0f368437f93adf0448ccf9e59 (diff)
downloadseaweedfs-28452303298ab84696521132fae5db9fc3d5b478.tar.xz
seaweedfs-28452303298ab84696521132fae5db9fc3d5b478.zip
passing broker into the assignments
-rw-r--r--weed/mq/client/sub_client/connect_to_sub_coordinator.go3
-rw-r--r--weed/mq/sub_coordinator/consumer_group.go21
-rw-r--r--weed/mq/sub_coordinator/partition_consumer_mapping.go21
-rw-r--r--weed/mq/sub_coordinator/partition_consumer_mapping_test.go20
-rw-r--r--weed/mq/sub_coordinator/partition_list.go11
5 files changed, 35 insertions, 41 deletions
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
index ae821bfdd..a2f7c7a3d 100644
--- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go
+++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
@@ -78,7 +78,7 @@ func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCo
go func(partition *mq_pb.Partition, broker string) {
defer wg.Done()
defer func() { <-semaphore }()
- glog.V(0).Infof("subscriber %s/%s/%s assigned partition %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition)
+ glog.V(0).Infof("subscriber %s/%s/%s assigned partition %+v at %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker)
sub.onEachPartition(partition, broker)
}(assigned.Partition, assigned.Broker)
}
@@ -87,5 +87,4 @@ func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCo
}
func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker string) {
- glog.V(0).Infof("subscriber %s/%s/%s processing partition %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition)
}
diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go
index 9bd020ad3..dad93dfe5 100644
--- a/weed/mq/sub_coordinator/consumer_group.go
+++ b/weed/mq/sub_coordinator/consumer_group.go
@@ -40,19 +40,19 @@ func NewConsumerGroupInstance(instanceId string) *ConsumerGroupInstance {
}
}
func (cg *ConsumerGroup) OnAddConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic) {
- cg.onConsumerGroupInstanceChange()
+ cg.onConsumerGroupInstanceChange("add consumer instance "+ consumerGroupInstance)
}
func (cg *ConsumerGroup) OnRemoveConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic) {
- cg.onConsumerGroupInstanceChange()
+ cg.onConsumerGroupInstanceChange("remove consumer instance "+ consumerGroupInstance)
}
-func (cg *ConsumerGroup) onConsumerGroupInstanceChange(){
+func (cg *ConsumerGroup) onConsumerGroupInstanceChange(reason string){
if cg.reBalanceTimer != nil {
cg.reBalanceTimer.Stop()
cg.reBalanceTimer = nil
}
cg.reBalanceTimer = time.AfterFunc(5*time.Second, func() {
- cg.RebalanceConsumberGroupInstances()
+ cg.RebalanceConsumberGroupInstances(reason)
cg.reBalanceTimer = nil
})
}
@@ -61,11 +61,11 @@ func (cg *ConsumerGroup) OnPartitionListChange() {
cg.reBalanceTimer.Stop()
cg.reBalanceTimer = nil
}
- cg.RebalanceConsumberGroupInstances()
+ cg.RebalanceConsumberGroupInstances("partition list change")
}
-func (cg *ConsumerGroup) RebalanceConsumberGroupInstances() {
- println("rebalance...")
+func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(reason string) {
+ println("rebalance due to", reason, "...")
now := time.Now().UnixNano()
@@ -75,10 +75,6 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances() {
glog.V(0).Infof("topic %s not found in balancer", cg.topic.String())
return
}
- partitions := make([]*topic.Partition, 0)
- for _, partitionSlot := range partitionSlotToBrokerList.PartitionSlots {
- partitions = append(partitions, topic.NewPartition(partitionSlot.RangeStart, partitionSlot.RangeStop, partitionSlotToBrokerList.RingSize, now))
- }
// collect current consumer group instance ids
consumerInstanceIds := make([]string, 0)
@@ -86,7 +82,7 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances() {
consumerInstanceIds = append(consumerInstanceIds, consumerGroupInstance.InstanceId)
}
- cg.mapping.BalanceToConsumerInstanceIds(partitions, consumerInstanceIds)
+ cg.mapping.BalanceToConsumerInstanceIds(partitionSlotToBrokerList, consumerInstanceIds)
// convert cg.mapping currentMapping to map of consumer group instance id to partition slots
consumerInstanceToPartitionSlots := make(map[string][]*PartitionSlotToConsumerInstance)
@@ -110,6 +106,7 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances() {
RingSize: partitionSlotToBrokerList.RingSize,
UnixTimeNs: now,
},
+ Broker: partitionSlot.Broker,
}
}
response := &mq_pb.SubscriberToSubCoordinatorResponse{
diff --git a/weed/mq/sub_coordinator/partition_consumer_mapping.go b/weed/mq/sub_coordinator/partition_consumer_mapping.go
index b7e5b12c6..ae2bf1c17 100644
--- a/weed/mq/sub_coordinator/partition_consumer_mapping.go
+++ b/weed/mq/sub_coordinator/partition_consumer_mapping.go
@@ -2,7 +2,7 @@ package sub_coordinator
import (
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"time"
)
@@ -23,19 +23,19 @@ func NewPartitionConsumerMapping(ringSize int32) *PartitionConsumerMapping {
// 2. allow one consumer instance to be down unexpectedly
// without affecting the processing power utilization
-func (pcm *PartitionConsumerMapping) BalanceToConsumerInstanceIds(partitions []*topic.Partition, consumerInstanceIds []string) {
- if len(partitions) == 0 || len(consumerInstanceIds) == 0 {
+func (pcm *PartitionConsumerMapping) BalanceToConsumerInstanceIds(partitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, consumerInstanceIds []string) {
+ if len(partitionSlotToBrokerList.PartitionSlots) == 0 || len(consumerInstanceIds) == 0 {
return
}
newVersion := time.Now().UnixNano()
- newMapping := NewPartitionSlotToConsumerInstanceList(partitions[0].RingSize, newVersion)
+ newMapping := NewPartitionSlotToConsumerInstanceList(partitionSlotToBrokerList.RingSize, newVersion)
var prevMapping *PartitionSlotToConsumerInstanceList
if len(pcm.prevMappings) > 0 {
prevMapping = pcm.prevMappings[len(pcm.prevMappings)-1]
} else {
prevMapping = nil
}
- newMapping.PartitionSlots = doBalanceSticky(partitions, consumerInstanceIds, prevMapping)
+ newMapping.PartitionSlots = doBalanceSticky(partitionSlotToBrokerList.PartitionSlots, consumerInstanceIds, prevMapping)
if pcm.currentMapping != nil {
pcm.prevMappings = append(pcm.prevMappings, pcm.currentMapping)
if len(pcm.prevMappings) > 10 {
@@ -45,7 +45,7 @@ func (pcm *PartitionConsumerMapping) BalanceToConsumerInstanceIds(partitions []*
pcm.currentMapping = newMapping
}
-func doBalanceSticky(partitions []*topic.Partition, consumerInstanceIds []string, prevMapping *PartitionSlotToConsumerInstanceList) (partitionSlots []*PartitionSlotToConsumerInstance) {
+func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerInstanceIds []string, prevMapping *PartitionSlotToConsumerInstanceList) (partitionSlots []*PartitionSlotToConsumerInstance) {
// collect previous consumer instance ids
prevConsumerInstanceIds := make(map[string]struct{})
if prevMapping != nil {
@@ -79,7 +79,14 @@ func doBalanceSticky(partitions []*topic.Partition, consumerInstanceIds []string
}
// make a copy of old mapping, skipping the deleted consumer instances
- newPartitionSlots := ToPartitionSlots(partitions)
+ newPartitionSlots := make([]*PartitionSlotToConsumerInstance, 0, len(partitions))
+ for _, partition := range partitions {
+ newPartitionSlots = append(newPartitionSlots, &PartitionSlotToConsumerInstance{
+ RangeStart: partition.RangeStart,
+ RangeStop: partition.RangeStop,
+ Broker: partition.AssignedBroker,
+ })
+ }
for _, newPartitionSlot := range newPartitionSlots {
key := fmt.Sprintf("%d-%d", newPartitionSlot.RangeStart, newPartitionSlot.RangeStop)
if prevPartitionSlot, ok := prevPartitionSlotMap[key]; ok {
diff --git a/weed/mq/sub_coordinator/partition_consumer_mapping_test.go b/weed/mq/sub_coordinator/partition_consumer_mapping_test.go
index 1d3050ef4..9a9abe011 100644
--- a/weed/mq/sub_coordinator/partition_consumer_mapping_test.go
+++ b/weed/mq/sub_coordinator/partition_consumer_mapping_test.go
@@ -1,14 +1,14 @@
package sub_coordinator
import (
- "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"reflect"
"testing"
)
func Test_doBalanceSticky(t *testing.T) {
type args struct {
- partitions []*topic.Partition
+ partitions []*pub_balancer.PartitionSlotToBroker
consumerInstanceIds []string
prevMapping *PartitionSlotToConsumerInstanceList
}
@@ -20,7 +20,7 @@ func Test_doBalanceSticky(t *testing.T) {
{
name: "1 consumer instance, 1 partition",
args: args{
- partitions: []*topic.Partition{
+ partitions: []*pub_balancer.PartitionSlotToBroker{
{
RangeStart: 0,
RangeStop: 100,
@@ -40,7 +40,7 @@ func Test_doBalanceSticky(t *testing.T) {
{
name: "2 consumer instances, 1 partition",
args: args{
- partitions: []*topic.Partition{
+ partitions: []*pub_balancer.PartitionSlotToBroker{
{
RangeStart: 0,
RangeStop: 100,
@@ -60,7 +60,7 @@ func Test_doBalanceSticky(t *testing.T) {
{
name: "1 consumer instance, 2 partitions",
args: args{
- partitions: []*topic.Partition{
+ partitions: []*pub_balancer.PartitionSlotToBroker{
{
RangeStart: 0,
RangeStop: 50,
@@ -89,7 +89,7 @@ func Test_doBalanceSticky(t *testing.T) {
{
name: "2 consumer instances, 2 partitions",
args: args{
- partitions: []*topic.Partition{
+ partitions: []*pub_balancer.PartitionSlotToBroker{
{
RangeStart: 0,
RangeStop: 50,
@@ -118,7 +118,7 @@ func Test_doBalanceSticky(t *testing.T) {
{
name: "2 consumer instances, 2 partitions, 1 deleted consumer instance",
args: args{
- partitions: []*topic.Partition{
+ partitions: []*pub_balancer.PartitionSlotToBroker{
{
RangeStart: 0,
RangeStop: 50,
@@ -160,7 +160,7 @@ func Test_doBalanceSticky(t *testing.T) {
{
name: "2 consumer instances, 2 partitions, 1 new consumer instance",
args: args{
- partitions: []*topic.Partition{
+ partitions: []*pub_balancer.PartitionSlotToBroker{
{
RangeStart: 0,
RangeStop: 50,
@@ -202,7 +202,7 @@ func Test_doBalanceSticky(t *testing.T) {
{
name: "2 consumer instances, 2 partitions, 1 new partition",
args: args{
- partitions: []*topic.Partition{
+ partitions: []*pub_balancer.PartitionSlotToBroker{
{
RangeStart: 0,
RangeStop: 50,
@@ -253,7 +253,7 @@ func Test_doBalanceSticky(t *testing.T) {
{
name: "2 consumer instances, 2 partitions, 1 new partition, 1 new consumer instance",
args: args{
- partitions: []*topic.Partition{
+ partitions: []*pub_balancer.PartitionSlotToBroker{
{
RangeStart: 0,
RangeStop: 50,
diff --git a/weed/mq/sub_coordinator/partition_list.go b/weed/mq/sub_coordinator/partition_list.go
index 1c3123bfc..b559007b5 100644
--- a/weed/mq/sub_coordinator/partition_list.go
+++ b/weed/mq/sub_coordinator/partition_list.go
@@ -5,6 +5,7 @@ import "github.com/seaweedfs/seaweedfs/weed/mq/topic"
type PartitionSlotToConsumerInstance struct {
RangeStart int32
RangeStop int32
+ Broker string
AssignedInstanceId string
}
@@ -21,16 +22,6 @@ func NewPartitionSlotToConsumerInstanceList(ringSize int32, version int64) *Part
}
}
-func ToPartitionSlots(partitions []*topic.Partition) (partitionSlots []*PartitionSlotToConsumerInstance) {
- for _, partition := range partitions {
- partitionSlots = append(partitionSlots, &PartitionSlotToConsumerInstance{
- RangeStart: partition.RangeStart,
- RangeStop: partition.RangeStop,
- })
- }
- return
-}
-
func ToPartitions(ringSize int32, slots []*PartitionSlotToConsumerInstance, unixTimeNs int64) []*topic.Partition {
partitions := make([]*topic.Partition, 0, len(slots))
for _, slot := range slots {