aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/client/pub_client/publisher.go9
1 files changed, 0 insertions, 9 deletions
diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go
index be29efa1c..5a134b3c2 100644
--- a/weed/mq/client/pub_client/publisher.go
+++ b/weed/mq/client/pub_client/publisher.go
@@ -24,7 +24,6 @@ type PublishClient struct {
type TopicPublisher struct {
namespace string
topic string
- partition2Broker *interval.SearchTree[*PublishClient, int32]
partition2Buffer *interval.SearchTree[*buffered_queue.BufferedQueue[*mq_pb.DataMessage], int32]
grpcDialOption grpc.DialOption
sync.Mutex // protects grpc
@@ -36,9 +35,6 @@ func NewTopicPublisher(namespace, topic string, config *PublisherConfiguration)
return &TopicPublisher{
namespace: namespace,
topic: topic,
- partition2Broker: interval.NewSearchTree[*PublishClient](func(a, b int32) int {
- return int(a - b)
- }),
partition2Buffer: interval.NewSearchTree[*buffered_queue.BufferedQueue[*mq_pb.DataMessage]](func(a, b int32) int {
return int(a - b)
}),
@@ -49,11 +45,6 @@ func NewTopicPublisher(namespace, topic string, config *PublisherConfiguration)
func (p *TopicPublisher) Shutdown() error {
- if clients, found := p.partition2Broker.AllIntersections(0, pub_balancer.MaxPartitionCount); found {
- for _, client := range clients {
- client.CloseSend()
- }
- }
if inputBuffers, found := p.partition2Buffer.AllIntersections(0, pub_balancer.MaxPartitionCount); found {
for _, inputBuffer := range inputBuffers {
inputBuffer.CloseInput()