diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_configure.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_configure.go | 57 |
1 files changed, 51 insertions, 6 deletions
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index fb916d880..40dd71db1 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -6,11 +6,13 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" + "github.com/seaweedfs/seaweedfs/weed/mq/schema" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" ) // ConfigureTopic Runs on any broker, but proxied to the balancer if not the balancer @@ -28,8 +30,11 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. return resp, err } - // validate the schema - if request.RecordType != nil { + // Validate flat schema format + if request.MessageRecordType != nil && len(request.KeyColumns) > 0 { + if err := schema.ValidateKeyColumns(request.MessageRecordType, request.KeyColumns); err != nil { + return nil, status.Errorf(codes.InvalidArgument, "invalid key columns: %v", err) + } } t := topic.FromPbTopic(request.Topic) @@ -47,8 +52,36 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. } if readErr == nil && assignErr == nil && len(resp.BrokerPartitionAssignments) == int(request.PartitionCount) { - glog.V(0).Infof("existing topic partitions %d: %+v", len(resp.BrokerPartitionAssignments), resp.BrokerPartitionAssignments) - return + // Check if schema needs to be updated + schemaChanged := false + + if request.MessageRecordType != nil && resp.MessageRecordType != nil { + if !proto.Equal(request.MessageRecordType, resp.MessageRecordType) { + schemaChanged = true + } + } else if request.MessageRecordType != nil || resp.MessageRecordType != nil { + schemaChanged = true + } + + if !schemaChanged { + glog.V(0).Infof("existing topic partitions %d: %+v", len(resp.BrokerPartitionAssignments), resp.BrokerPartitionAssignments) + return resp, nil + } + + // Update schema in existing configuration + resp.MessageRecordType = request.MessageRecordType + resp.KeyColumns = request.KeyColumns + resp.SchemaFormat = request.SchemaFormat + + if err := b.fca.SaveTopicConfToFiler(t, resp); err != nil { + return nil, fmt.Errorf("update topic schemas: %w", err) + } + + // Invalidate TopicExists cache since we just updated the topic + b.invalidateTopicExistsCache(t) + + glog.V(0).Infof("updated schemas for topic %s", request.Topic) + return resp, nil } if resp != nil && len(resp.BrokerPartitionAssignments) > 0 { @@ -61,7 +94,10 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. return nil, status.Errorf(codes.Unavailable, "no broker available: %v", pub_balancer.ErrNoBroker) } resp.BrokerPartitionAssignments = pub_balancer.AllocateTopicPartitions(b.PubBalancer.Brokers, request.PartitionCount) - resp.RecordType = request.RecordType + // Set flat schema format + resp.MessageRecordType = request.MessageRecordType + resp.KeyColumns = request.KeyColumns + resp.SchemaFormat = request.SchemaFormat resp.Retention = request.Retention // save the topic configuration on filer @@ -69,9 +105,18 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. return nil, fmt.Errorf("configure topic: %w", err) } + // Invalidate TopicExists cache since we just created/updated the topic + b.invalidateTopicExistsCache(t) + b.PubBalancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments) + // Actually assign the new partitions to brokers and add to localTopicManager + if assignErr := b.assignTopicPartitionsToBrokers(ctx, request.Topic, resp.BrokerPartitionAssignments, true); assignErr != nil { + glog.Errorf("assign topic %s partitions to brokers: %v", request.Topic, assignErr) + return nil, fmt.Errorf("assign topic partitions: %w", assignErr) + } + glog.V(0).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, resp.BrokerPartitionAssignments) - return resp, err + return resp, nil } |
