aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-03 13:30:30 -0800
committerchrislu <chris.lu@gmail.com>2024-01-03 13:30:30 -0800
commit35869b5c8090af92475fadf4766892f4db5adf7b (patch)
treeef356c3de16c3a398bc0d402bf2db3afc971b9a9
parentefb695fd931619831c57a34f61ee3bdf9e5d2339 (diff)
downloadseaweedfs-35869b5c8090af92475fadf4766892f4db5adf7b.tar.xz
seaweedfs-35869b5c8090af92475fadf4766892f4db5adf7b.zip
subscriber can be notified of the assignment change when topic is just configured
Next: Subscriber needs to read by the timestamp offset.
-rw-r--r--weed/cluster/lock_client.go2
-rw-r--r--weed/mq/broker/broker_server.go4
-rw-r--r--weed/mq/client/cmd/weed_sub/subscriber.go4
-rw-r--r--weed/mq/client/sub_client/connect_to_sub_coordinator.go4
-rw-r--r--weed/mq/pub_balancer/balancer.go1
-rw-r--r--weed/mq/pub_balancer/lookup.go6
-rw-r--r--weed/mq/sub_coordinator/consumer_group.go29
-rw-r--r--weed/mq/sub_coordinator/coordinator.go30
8 files changed, 55 insertions, 25 deletions
diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go
index c21f20874..16f7abc8c 100644
--- a/weed/cluster/lock_client.go
+++ b/weed/cluster/lock_client.go
@@ -124,7 +124,7 @@ func (lc *LockClient) doNewLock(key string, lockDuration time.Duration, owner st
}
func (lock *LiveLock) IsLocked() bool {
- return lock.isLocked
+ return lock!=nil && lock.isLocked
}
func (lock *LiveLock) StopLock() error {
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 52b34ddbc..34a263032 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -47,6 +47,7 @@ type MessageQueueBroker struct {
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
pub_broker_balancer := pub_balancer.NewBalancer()
+ coordinator := sub_coordinator.NewCoordinator(pub_broker_balancer)
mqBroker = &MessageQueueBroker{
option: option,
@@ -55,9 +56,10 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
filers: make(map[pb.ServerAddress]struct{}),
localTopicManager: topic.NewLocalTopicManager(),
Balancer: pub_broker_balancer,
- Coordinator: sub_coordinator.NewCoordinator(pub_broker_balancer),
+ Coordinator: coordinator,
}
mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate)
+ pub_broker_balancer.OnPartitionChange = mqBroker.Coordinator.OnPartitionChange
go mqBroker.MasterClient.KeepConnectedToMaster()
diff --git a/weed/mq/client/cmd/weed_sub/subscriber.go b/weed/mq/client/cmd/weed_sub/subscriber.go
index 16c9fdb4d..310e5ac78 100644
--- a/weed/mq/client/cmd/weed_sub/subscriber.go
+++ b/weed/mq/client/cmd/weed_sub/subscriber.go
@@ -30,11 +30,11 @@ func main() {
Namespace: *namespace,
Topic: *topic,
Filter: "",
- StartTime: time.Now(),
+ StartTime: time.Unix(0, 0),
}
processorConfig := sub_client.ProcessorConfiguration{
- ConcurrentPartitionLimit: 1,
+ ConcurrentPartitionLimit: 6,
}
brokers := strings.Split(*seedBrokers, ",")
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
index e60243083..afafb15ea 100644
--- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go
+++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
@@ -145,7 +145,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
glog.V(3).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
resp, err := subscribeClient.Recv()
if err != nil {
- return fmt.Errorf("subscribe error: %v", err)
+ return fmt.Errorf("subscribe recv: %v", err)
}
if resp.Message == nil {
continue
@@ -156,10 +156,10 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
if processErr != nil {
return fmt.Errorf("process error: %v", processErr)
}
+ sub.alreadyProcessedTsNs = m.Data.TsNs
if !shouldContinue {
return nil
}
- sub.alreadyProcessedTsNs = m.Data.TsNs
case *mq_pb.SubscribeResponse_Ctrl:
if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic {
return io.EOF
diff --git a/weed/mq/pub_balancer/balancer.go b/weed/mq/pub_balancer/balancer.go
index 4524d95d0..0bcbdd51b 100644
--- a/weed/mq/pub_balancer/balancer.go
+++ b/weed/mq/pub_balancer/balancer.go
@@ -32,6 +32,7 @@ type Balancer struct {
Brokers cmap.ConcurrentMap[string, *BrokerStats] // key: broker address
// Collected from all brokers when they connect to the broker leader
TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name
+ OnPartitionChange func(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment)
}
func NewBalancer() *Balancer {
diff --git a/weed/mq/pub_balancer/lookup.go b/weed/mq/pub_balancer/lookup.go
index 33c5a864b..9379a341d 100644
--- a/weed/mq/pub_balancer/lookup.go
+++ b/weed/mq/pub_balancer/lookup.go
@@ -33,7 +33,7 @@ func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, pu
}
}
}
- if len(assignments) > 0 || !publish {
+ if len(assignments) > 0 || !(publish && len(assignments) !=int(partitionCount) && partitionCount > 0) {
// glog.V(0).Infof("existing topic partitions %d: %v", len(assignments), assignments)
return assignments, nil
}
@@ -48,5 +48,7 @@ func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, pu
if balancer.Brokers.IsEmpty() {
return nil, ErrNoBroker
}
- return allocateTopicPartitions(balancer.Brokers, partitionCount), nil
+ assignments = allocateTopicPartitions(balancer.Brokers, partitionCount)
+ balancer.OnPartitionChange(topic, assignments)
+ return
}
diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go
index dad93dfe5..9b62f616e 100644
--- a/weed/mq/sub_coordinator/consumer_group.go
+++ b/weed/mq/sub_coordinator/consumer_group.go
@@ -52,32 +52,40 @@ func (cg *ConsumerGroup) onConsumerGroupInstanceChange(reason string){
cg.reBalanceTimer = nil
}
cg.reBalanceTimer = time.AfterFunc(5*time.Second, func() {
- cg.RebalanceConsumberGroupInstances(reason)
+ cg.RebalanceConsumberGroupInstances(nil, reason)
cg.reBalanceTimer = nil
})
}
-func (cg *ConsumerGroup) OnPartitionListChange() {
+func (cg *ConsumerGroup) OnPartitionListChange(assignments []*mq_pb.BrokerPartitionAssignment) {
if cg.reBalanceTimer != nil {
cg.reBalanceTimer.Stop()
cg.reBalanceTimer = nil
}
- cg.RebalanceConsumberGroupInstances("partition list change")
+ partitionSlotToBrokerList := pub_balancer.NewPartitionSlotToBrokerList(pub_balancer.MaxPartitionCount)
+ for _, assignment := range assignments {
+ partitionSlotToBrokerList.AddBroker(assignment.Partition, assignment.LeaderBroker)
+ }
+ cg.RebalanceConsumberGroupInstances(partitionSlotToBrokerList, "partition list change")
}
-func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(reason string) {
- println("rebalance due to", reason, "...")
+func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, reason string) {
+ glog.V(0).Infof("rebalance consumer group %s due to %s", cg.topic.String(), reason)
now := time.Now().UnixNano()
// collect current topic partitions
- partitionSlotToBrokerList, found := cg.pubBalancer.TopicToBrokers.Get(cg.topic.String())
- if !found {
- glog.V(0).Infof("topic %s not found in balancer", cg.topic.String())
- return
+ partitionSlotToBrokerList := knownPartitionSlotToBrokerList
+ if partitionSlotToBrokerList == nil {
+ var found bool
+ partitionSlotToBrokerList, found = cg.pubBalancer.TopicToBrokers.Get(cg.topic.String())
+ if !found {
+ glog.V(0).Infof("topic %s not found in balancer", cg.topic.String())
+ return
+ }
}
// collect current consumer group instance ids
- consumerInstanceIds := make([]string, 0)
+ var consumerInstanceIds []string
for _, consumerGroupInstance := range cg.ConsumerGroupInstances.Items() {
consumerInstanceIds = append(consumerInstanceIds, consumerGroupInstance.InstanceId)
}
@@ -116,6 +124,7 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(reason string) {
},
},
}
+ println("sending response to", consumerGroupInstance.InstanceId, "...")
consumerGroupInstance.ResponseChan <- response
}
diff --git a/weed/mq/sub_coordinator/coordinator.go b/weed/mq/sub_coordinator/coordinator.go
index 9a88c383a..269c12e66 100644
--- a/weed/mq/sub_coordinator/coordinator.go
+++ b/weed/mq/sub_coordinator/coordinator.go
@@ -28,14 +28,16 @@ func NewCoordinator(balancer *pub_balancer.Balancer) *Coordinator {
}
}
-func (c *Coordinator) GetTopicConsumerGroups(topic *mq_pb.Topic) *TopicConsumerGroups {
+func (c *Coordinator) GetTopicConsumerGroups(topic *mq_pb.Topic, createIfMissing bool) *TopicConsumerGroups {
topicName := toTopicName(topic)
tcg, _ := c.TopicSubscribers.Get(topicName)
- if tcg == nil {
+ if tcg == nil && createIfMissing{
tcg = &TopicConsumerGroups{
ConsumerGroups: cmap.New[*ConsumerGroup](),
}
- c.TopicSubscribers.Set(topicName, tcg)
+ if !c.TopicSubscribers.SetIfAbsent(topicName, tcg) {
+ tcg, _ = c.TopicSubscribers.Get(topicName)
+ }
}
return tcg
}
@@ -50,23 +52,27 @@ func toTopicName(topic *mq_pb.Topic) string {
}
func (c *Coordinator) AddSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) *ConsumerGroupInstance {
- tcg := c.GetTopicConsumerGroups(topic)
+ tcg := c.GetTopicConsumerGroups(topic, true)
cg, _ := tcg.ConsumerGroups.Get(consumerGroup)
if cg == nil {
cg = NewConsumerGroup(topic, c.balancer)
- tcg.ConsumerGroups.Set(consumerGroup, cg)
+ if !tcg.ConsumerGroups.SetIfAbsent(consumerGroup, cg){
+ cg, _ = tcg.ConsumerGroups.Get(consumerGroup)
+ }
}
cgi, _ := cg.ConsumerGroupInstances.Get(consumerGroupInstance)
if cgi == nil {
cgi = NewConsumerGroupInstance(consumerGroupInstance)
- cg.ConsumerGroupInstances.Set(consumerGroupInstance, cgi)
+ if !cg.ConsumerGroupInstances.SetIfAbsent(consumerGroupInstance, cgi){
+ cgi, _ = cg.ConsumerGroupInstances.Get(consumerGroupInstance)
+ }
}
cg.OnAddConsumerGroupInstance(consumerGroupInstance, topic)
return cgi
}
func (c *Coordinator) RemoveSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) {
- tcg, _ := c.TopicSubscribers.Get(toTopicName(topic))
+ tcg := c.GetTopicConsumerGroups(topic, false)
if tcg == nil {
return
}
@@ -83,3 +89,13 @@ func (c *Coordinator) RemoveSubscriber(consumerGroup, consumerGroupInstance stri
c.RemoveTopic(topic)
}
}
+
+func (c *Coordinator) OnPartitionChange(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment) {
+ tcg, _ := c.TopicSubscribers.Get(toTopicName(topic))
+ if tcg == nil {
+ return
+ }
+ for _, cg := range tcg.ConsumerGroups.Items() {
+ cg.OnPartitionListChange(assignments)
+ }
+}