diff options
| author | chrislu <chris.lu@gmail.com> | 2024-04-12 22:29:53 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-04-12 22:29:53 -0700 |
| commit | 53d1d2b78a480ac9b8432b9a78d6f6c48a6cfbf7 (patch) | |
| tree | cd673793ef1d0338ce06e2ea62425ec27235bb45 | |
| parent | 04fb4c34e36fb14cc1c400486511d2a89dfab3ce (diff) | |
| download | seaweedfs-53d1d2b78a480ac9b8432b9a78d6f6c48a6cfbf7.tar.xz seaweedfs-53d1d2b78a480ac9b8432b9a78d6f6c48a6cfbf7.zip | |
save schema when configuring topic
| -rw-r--r-- | weed/mq/broker/broker_grpc_configure.go | 10 |
1 files changed, 10 insertions, 0 deletions
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index 6a6e92922..40ac8df23 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" + "github.com/seaweedfs/seaweedfs/weed/mq/schema" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" @@ -27,6 +28,14 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. return resp, err } + // validate the schema + if request.RecordType != nil { + if _, err = schema.NewSchema(request.RecordType); err != nil { + return nil, status.Errorf(codes.InvalidArgument, "invalid record type %+v: %v", request.RecordType, err) + } + } + + t := topic.FromPbTopic(request.Topic) var readErr, assignErr error resp, readErr = b.readTopicConfFromFiler(t) @@ -56,6 +65,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. return nil, status.Errorf(codes.Unavailable, pub_balancer.ErrNoBroker.Error()) } resp.BrokerPartitionAssignments = pub_balancer.AllocateTopicPartitions(b.Balancer.Brokers, request.PartitionCount) + resp.RecordType = request.RecordType // save the topic configuration on filer if err := b.saveTopicConfToFiler(request.Topic, resp); err != nil { |
