aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-05-13 18:30:10 -0700
committerchrislu <chris.lu@gmail.com>2024-05-13 18:30:10 -0700
commit372bd8d71d0ef156057da4769de3cbaed201af26 (patch)
tree2f96de44d1f2c1e9cd80acaec1a6c6b7401db804
parent2142842f8248e841e900ee75783726cbd473ecf0 (diff)
downloadseaweedfs-372bd8d71d0ef156057da4769de3cbaed201af26.tar.xz
seaweedfs-372bd8d71d0ef156057da4769de3cbaed201af26.zip
consumer instance passing MaxPartitionCount to coordinator
-rw-r--r--weed/mq/client/sub_client/connect_to_sub_coordinator.go1
-rw-r--r--weed/mq/client/sub_client/subscriber.go2
-rw-r--r--weed/mq/sub_coordinator/consumer_group.go1
-rw-r--r--weed/mq/sub_coordinator/coordinator.go1
-rw-r--r--weed/pb/mq.proto1
-rw-r--r--weed/pb/mq_pb/mq.pb.go17
6 files changed, 19 insertions, 4 deletions
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
index b0b533e42..ef7841725 100644
--- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go
+++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
@@ -51,6 +51,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
ConsumerGroup: sub.SubscriberConfig.ConsumerGroup,
ConsumerGroupInstanceId: sub.SubscriberConfig.ConsumerGroupInstanceId,
Topic: sub.ContentConfig.Topic.ToPbTopic(),
+ MaxPartitionCount: sub.ProcessorConfig.ConcurrentPartitionLimit,
},
},
}); err != nil {
diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go
index 3f343ed1d..723520c1d 100644
--- a/weed/mq/client/sub_client/subscriber.go
+++ b/weed/mq/client/sub_client/subscriber.go
@@ -21,7 +21,7 @@ type ContentConfiguration struct {
}
type ProcessorConfiguration struct {
- ConcurrentPartitionLimit int // how many partitions to process concurrently
+ ConcurrentPartitionLimit int32 // how many partitions to process concurrently
}
type OnEachMessageFunc func(key, value []byte) (shouldContinue bool, err error)
diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go
index d24a38d8a..f4938d539 100644
--- a/weed/mq/sub_coordinator/consumer_group.go
+++ b/weed/mq/sub_coordinator/consumer_group.go
@@ -14,6 +14,7 @@ type ConsumerGroupInstance struct {
// the consumer group instance may not have an active partition
Partitions []*topic.Partition
ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse
+ MaxPartitionCount int32
}
type ConsumerGroup struct {
topic topic.Topic
diff --git a/weed/mq/sub_coordinator/coordinator.go b/weed/mq/sub_coordinator/coordinator.go
index d128310d1..62c44fd48 100644
--- a/weed/mq/sub_coordinator/coordinator.go
+++ b/weed/mq/sub_coordinator/coordinator.go
@@ -67,6 +67,7 @@ func (c *Coordinator) AddSubscriber(initMessage *mq_pb.SubscriberToSubCoordinato
cgi, _ = cg.ConsumerGroupInstances.Get(initMessage.ConsumerGroupInstanceId)
}
}
+ cgi.MaxPartitionCount = initMessage.MaxPartitionCount
cg.OnAddConsumerGroupInstance(initMessage.ConsumerGroupInstanceId, initMessage.Topic)
return cgi
}
diff --git a/weed/pb/mq.proto b/weed/pb/mq.proto
index 9e4874f30..a6bc89ad2 100644
--- a/weed/pb/mq.proto
+++ b/weed/pb/mq.proto
@@ -163,6 +163,7 @@ message SubscriberToSubCoordinatorRequest {
string consumer_group = 1;
string consumer_group_instance_id = 2;
Topic topic = 3;
+ int32 max_partition_count = 4;
}
message AckMessage {
Partition partition = 1;
diff --git a/weed/pb/mq_pb/mq.pb.go b/weed/pb/mq_pb/mq.pb.go
index ff381aec9..6a4fb8e1e 100644
--- a/weed/pb/mq_pb/mq.pb.go
+++ b/weed/pb/mq_pb/mq.pb.go
@@ -2195,6 +2195,7 @@ type SubscriberToSubCoordinatorRequest_InitMessage struct {
ConsumerGroup string `protobuf:"bytes,1,opt,name=consumer_group,json=consumerGroup,proto3" json:"consumer_group,omitempty"`
ConsumerGroupInstanceId string `protobuf:"bytes,2,opt,name=consumer_group_instance_id,json=consumerGroupInstanceId,proto3" json:"consumer_group_instance_id,omitempty"`
Topic *Topic `protobuf:"bytes,3,opt,name=topic,proto3" json:"topic,omitempty"`
+ MaxPartitionCount int32 `protobuf:"varint,4,opt,name=max_partition_count,json=maxPartitionCount,proto3" json:"max_partition_count,omitempty"`
}
func (x *SubscriberToSubCoordinatorRequest_InitMessage) Reset() {
@@ -2250,6 +2251,13 @@ func (x *SubscriberToSubCoordinatorRequest_InitMessage) GetTopic() *Topic {
return nil
}
+func (x *SubscriberToSubCoordinatorRequest_InitMessage) GetMaxPartitionCount() int32 {
+ if x != nil {
+ return x.MaxPartitionCount
+ }
+ return 0
+}
+
type SubscriberToSubCoordinatorRequest_AckMessage struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -2946,7 +2954,7 @@ var file_mq_proto_rawDesc = []byte{
0x28, 0x08, 0x52, 0x0a, 0x69, 0x73, 0x44, 0x72, 0x61, 0x69, 0x6e, 0x69, 0x6e, 0x67, 0x22, 0x1f,
0x0a, 0x1d, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72,
0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22,
- 0xca, 0x03, 0x0a, 0x21, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x54, 0x6f,
+ 0xfa, 0x03, 0x0a, 0x21, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x54, 0x6f,
0x53, 0x75, 0x62, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x51, 0x0a, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x18, 0x01, 0x20,
0x01, 0x28, 0x0b, 0x32, 0x3b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f,
@@ -2958,7 +2966,7 @@ var file_mq_proto_rawDesc = []byte{
0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x54,
0x6f, 0x53, 0x75, 0x62, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x41, 0x63, 0x6b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67,
- 0x65, 0x48, 0x00, 0x52, 0x03, 0x61, 0x63, 0x6b, 0x1a, 0x9c, 0x01, 0x0a, 0x0b, 0x49, 0x6e, 0x69,
+ 0x65, 0x48, 0x00, 0x52, 0x03, 0x61, 0x63, 0x6b, 0x1a, 0xcc, 0x01, 0x0a, 0x0b, 0x49, 0x6e, 0x69,
0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6e, 0x73,
0x75, 0x6d, 0x65, 0x72, 0x5f, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x0d, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x12,
@@ -2968,7 +2976,10 @@ var file_mq_proto_rawDesc = []byte{
0x75, 0x70, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x05,
0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65,
0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63,
- 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x1a, 0x58, 0x0a, 0x0a, 0x41, 0x63, 0x6b, 0x4d, 0x65,
+ 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x2e, 0x0a, 0x13, 0x6d, 0x61, 0x78, 0x5f, 0x70,
+ 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x04,
+ 0x20, 0x01, 0x28, 0x05, 0x52, 0x11, 0x6d, 0x61, 0x78, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69,
+ 0x6f, 0x6e, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x1a, 0x58, 0x0a, 0x0a, 0x41, 0x63, 0x6b, 0x4d, 0x65,
0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x35, 0x0a, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69,
0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f,