aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-05-23 09:21:48 -0700
committerchrislu <chris.lu@gmail.com>2024-05-23 09:21:48 -0700
commit37d1ee562d73dfa8f130fc8b9c0bf88fd3337c16 (patch)
tree7c0437741d3d3cb74e79855f9fd87adf3e850a63
parentcdeaaf95b40336d924aae3383d80e6a708a36d2d (diff)
downloadseaweedfs-37d1ee562d73dfa8f130fc8b9c0bf88fd3337c16.tar.xz
seaweedfs-37d1ee562d73dfa8f130fc8b9c0bf88fd3337c16.zip
refactor
-rw-r--r--weed/mq/sub_coordinator/consumer_group.go19
-rw-r--r--weed/mq/sub_coordinator/consumer_group_instance.go26
2 files changed, 26 insertions, 19 deletions
diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go
index 4f4265e5e..3b8f90fa5 100644
--- a/weed/mq/sub_coordinator/consumer_group.go
+++ b/weed/mq/sub_coordinator/consumer_group.go
@@ -1,7 +1,6 @@
package sub_coordinator
import (
- "fmt"
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
@@ -10,18 +9,6 @@ import (
"time"
)
-type ConsumerGroupInstance struct {
- InstanceId string
- // the consumer group instance may not have an active partition
- Partitions []*topic.Partition
- ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse
- MaxPartitionCount int32
-}
-
-func (i ConsumerGroupInstance) AckUnAssignment(assignment *mq_pb.SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage) {
- fmt.Printf("ack unassignment %v\n", assignment)
-}
-
type ConsumerGroup struct {
topic topic.Topic
// map a consumer group instance id to a consumer group instance
@@ -42,12 +29,6 @@ func NewConsumerGroup(t *mq_pb.Topic, pubBalancer *pub_balancer.PubBalancer, fil
}
}
-func NewConsumerGroupInstance(instanceId string) *ConsumerGroupInstance {
- return &ConsumerGroupInstance{
- InstanceId: instanceId,
- ResponseChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1),
- }
-}
func (cg *ConsumerGroup) OnAddConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic, maxPartitionCount, rebalanceSeconds int32) {
cg.onConsumerGroupInstanceChange(true, "add consumer instance "+consumerGroupInstance, maxPartitionCount, rebalanceSeconds)
}
diff --git a/weed/mq/sub_coordinator/consumer_group_instance.go b/weed/mq/sub_coordinator/consumer_group_instance.go
new file mode 100644
index 000000000..3fac28358
--- /dev/null
+++ b/weed/mq/sub_coordinator/consumer_group_instance.go
@@ -0,0 +1,26 @@
+package sub_coordinator
+
+import (
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+)
+
+type ConsumerGroupInstance struct {
+ InstanceId string
+ // the consumer group instance may not have an active partition
+ Partitions []*topic.Partition
+ ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse
+ MaxPartitionCount int32
+}
+
+func NewConsumerGroupInstance(instanceId string) *ConsumerGroupInstance {
+ return &ConsumerGroupInstance{
+ InstanceId: instanceId,
+ ResponseChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1),
+ }
+}
+
+func (i ConsumerGroupInstance) AckUnAssignment(assignment *mq_pb.SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage) {
+ fmt.Printf("ack unassignment %v\n", assignment)
+}