aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-04-12 22:29:53 -0700
committerchrislu <chris.lu@gmail.com>2024-04-12 22:29:53 -0700
commit53d1d2b78a480ac9b8432b9a78d6f6c48a6cfbf7 (patch)
treecd673793ef1d0338ce06e2ea62425ec27235bb45
parent04fb4c34e36fb14cc1c400486511d2a89dfab3ce (diff)
downloadseaweedfs-53d1d2b78a480ac9b8432b9a78d6f6c48a6cfbf7.tar.xz
seaweedfs-53d1d2b78a480ac9b8432b9a78d6f6c48a6cfbf7.zip
save schema when configuring topic
-rw-r--r--weed/mq/broker/broker_grpc_configure.go10
1 files changed, 10 insertions, 0 deletions
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go
index 6a6e92922..40ac8df23 100644
--- a/weed/mq/broker/broker_grpc_configure.go
+++ b/weed/mq/broker/broker_grpc_configure.go
@@ -5,6 +5,7 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
+ "github.com/seaweedfs/seaweedfs/weed/mq/schema"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
@@ -27,6 +28,14 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
return resp, err
}
+ // validate the schema
+ if request.RecordType != nil {
+ if _, err = schema.NewSchema(request.RecordType); err != nil {
+ return nil, status.Errorf(codes.InvalidArgument, "invalid record type %+v: %v", request.RecordType, err)
+ }
+ }
+
+
t := topic.FromPbTopic(request.Topic)
var readErr, assignErr error
resp, readErr = b.readTopicConfFromFiler(t)
@@ -56,6 +65,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
return nil, status.Errorf(codes.Unavailable, pub_balancer.ErrNoBroker.Error())
}
resp.BrokerPartitionAssignments = pub_balancer.AllocateTopicPartitions(b.Balancer.Brokers, request.PartitionCount)
+ resp.RecordType = request.RecordType
// save the topic configuration on filer
if err := b.saveTopicConfToFiler(request.Topic, resp); err != nil {