aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_create.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_create.go')
-rw-r--r--weed/mq/broker/broker_grpc_create.go13
1 files changed, 13 insertions, 0 deletions
diff --git a/weed/mq/broker/broker_grpc_create.go b/weed/mq/broker/broker_grpc_create.go
index 12b76e565..cb9c91f28 100644
--- a/weed/mq/broker/broker_grpc_create.go
+++ b/weed/mq/broker/broker_grpc_create.go
@@ -3,6 +3,7 @@ package broker
import (
"context"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -27,6 +28,18 @@ func (broker *MessageQueueBroker) CreateTopic(ctx context.Context, request *mq_p
ret := &mq_pb.CreateTopicResponse{}
ret.BrokerPartitionAssignments, err = broker.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true, request.PartitionCount)
+ for _, bpa := range ret.BrokerPartitionAssignments {
+ if doCreateErr := broker.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error {
+ _, doCreateErr := client.DoCreateTopic(ctx, &mq_pb.DoCreateTopicRequest{
+ Topic: request.Topic,
+ Partition: bpa.Partition,
+ })
+ return doCreateErr
+ }); doCreateErr != nil {
+ return nil, doCreateErr
+ }
+ }
+
return ret, err
}