diff options
Diffstat (limited to 'weed/mq')
| -rw-r--r-- | weed/mq/broker/broker_grpc_sub_coordinator.go | 30 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/balancer.go | 3 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/partition_list_broker.go | 10 | ||||
| -rw-r--r-- | weed/mq/sub_coordinator/consumer_group.go | 11 | ||||
| -rw-r--r-- | weed/mq/sub_coordinator/coordinator.go | 7 |
5 files changed, 31 insertions, 30 deletions
diff --git a/weed/mq/broker/broker_grpc_sub_coordinator.go b/weed/mq/broker/broker_grpc_sub_coordinator.go index 349db3178..b8438b61f 100644 --- a/weed/mq/broker/broker_grpc_sub_coordinator.go +++ b/weed/mq/broker/broker_grpc_sub_coordinator.go @@ -37,24 +37,24 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess // process ack messages go func() { - for { - _, err := stream.Recv() - if err != nil { - glog.V(0).Infof("subscriber %s/%s/%s receive: %v", initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic, err) - } + for { + _, err := stream.Recv() + if err != nil { + glog.V(0).Infof("subscriber %s/%s/%s receive: %v", initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic, err) + } - select { - case <-ctx.Done(): - err := ctx.Err() - if err == context.Canceled { - // Client disconnected + select { + case <-ctx.Done(): + err := ctx.Err() + if err == context.Canceled { + // Client disconnected + return + } return + default: + // Continue processing the request } - return - default: - // Continue processing the request } - } }() // send commands to subscriber @@ -68,7 +68,7 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess } glog.V(0).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic, err) return err - case message := <- cgi.ResponseChan: + case message := <-cgi.ResponseChan: if err := stream.Send(message); err != nil { glog.V(0).Infof("subscriber %s/%s/%s send: %v", initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic, err) } diff --git a/weed/mq/pub_balancer/balancer.go b/weed/mq/pub_balancer/balancer.go index 988b971af..5e8c8275e 100644 --- a/weed/mq/pub_balancer/balancer.go +++ b/weed/mq/pub_balancer/balancer.go @@ -32,9 +32,10 @@ type Balancer struct { // Collected from all brokers when they connect to the broker leader TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name } + func NewBalancer() *Balancer { return &Balancer{ - Brokers: cmap.New[*BrokerStats](), + Brokers: cmap.New[*BrokerStats](), TopicToBrokers: cmap.New[*PartitionSlotToBrokerList](), } } diff --git a/weed/mq/pub_balancer/partition_list_broker.go b/weed/mq/pub_balancer/partition_list_broker.go index 7ceb2a9fc..9dc6140b3 100644 --- a/weed/mq/pub_balancer/partition_list_broker.go +++ b/weed/mq/pub_balancer/partition_list_broker.go @@ -6,8 +6,8 @@ import ( ) type PartitionSlotToBroker struct { - RangeStart int32 - RangeStop int32 + RangeStart int32 + RangeStop int32 AssignedBroker string } @@ -36,12 +36,12 @@ func (ps *PartitionSlotToBrokerList) AddBroker(partition *mq_pb.Partition, broke } } ps.PartitionSlots = append(ps.PartitionSlots, &PartitionSlotToBroker{ - RangeStart: partition.RangeStart, - RangeStop: partition.RangeStop, + RangeStart: partition.RangeStart, + RangeStop: partition.RangeStop, AssignedBroker: broker, }) } -func (ps *PartitionSlotToBrokerList) RemoveBroker(broker string) { +func (ps *PartitionSlotToBrokerList) RemoveBroker(broker string) { for _, partitionSlot := range ps.PartitionSlots { if partitionSlot.AssignedBroker == broker { partitionSlot.AssignedBroker = "" diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go index be06a01f8..566a26ef7 100644 --- a/weed/mq/sub_coordinator/consumer_group.go +++ b/weed/mq/sub_coordinator/consumer_group.go @@ -6,28 +6,29 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" ) + type ConsumerGroupInstance struct { InstanceId string // the consumer group instance may not have an active partition - Partitions []*topic.Partition - ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse + Partitions []*topic.Partition + ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse } type ConsumerGroup struct { // map a consumer group instance id to a consumer group instance ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance] - mapping *PartitionConsumerMapping + mapping *PartitionConsumerMapping } func NewConsumerGroup() *ConsumerGroup { return &ConsumerGroup{ ConsumerGroupInstances: cmap.New[*ConsumerGroupInstance](), - mapping: NewPartitionConsumerMapping(pub_balancer.MaxPartitionCount), + mapping: NewPartitionConsumerMapping(pub_balancer.MaxPartitionCount), } } func NewConsumerGroupInstance(instanceId string) *ConsumerGroupInstance { return &ConsumerGroupInstance{ - InstanceId: instanceId, + InstanceId: instanceId, ResponseChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1), } } diff --git a/weed/mq/sub_coordinator/coordinator.go b/weed/mq/sub_coordinator/coordinator.go index f4d65ea5b..7ca536c6b 100644 --- a/weed/mq/sub_coordinator/coordinator.go +++ b/weed/mq/sub_coordinator/coordinator.go @@ -6,7 +6,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" ) - type TopicConsumerGroups struct { // map a consumer group name to a consumer group ConsumerGroups cmap.ConcurrentMap[string, *ConsumerGroup] @@ -19,13 +18,13 @@ type TopicConsumerGroups struct { type Coordinator struct { // map topic name to consumer groups TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups] - balancer *pub_balancer.Balancer + balancer *pub_balancer.Balancer } func NewCoordinator(balancer *pub_balancer.Balancer) *Coordinator { return &Coordinator{ TopicSubscribers: cmap.New[*TopicConsumerGroups](), - balancer: balancer, + balancer: balancer, } } @@ -50,7 +49,7 @@ func toTopicName(topic *mq_pb.Topic) string { return topicName } -func (c *Coordinator) AddSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) *ConsumerGroupInstance{ +func (c *Coordinator) AddSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) *ConsumerGroupInstance { tcg := c.GetTopicConsumerGroups(topic) cg, _ := tcg.ConsumerGroups.Get(consumerGroup) if cg == nil { |
