diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_create.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_create.go | 13 |
1 files changed, 13 insertions, 0 deletions
diff --git a/weed/mq/broker/broker_grpc_create.go b/weed/mq/broker/broker_grpc_create.go index 12b76e565..cb9c91f28 100644 --- a/weed/mq/broker/broker_grpc_create.go +++ b/weed/mq/broker/broker_grpc_create.go @@ -3,6 +3,7 @@ package broker import ( "context" "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -27,6 +28,18 @@ func (broker *MessageQueueBroker) CreateTopic(ctx context.Context, request *mq_p ret := &mq_pb.CreateTopicResponse{} ret.BrokerPartitionAssignments, err = broker.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true, request.PartitionCount) + for _, bpa := range ret.BrokerPartitionAssignments { + if doCreateErr := broker.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error { + _, doCreateErr := client.DoCreateTopic(ctx, &mq_pb.DoCreateTopicRequest{ + Topic: request.Topic, + Partition: bpa.Partition, + }) + return doCreateErr + }); doCreateErr != nil { + return nil, doCreateErr + } + } + return ret, err } |
