aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_configure.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_configure.go')
-rw-r--r--weed/mq/broker/broker_grpc_configure.go57
1 files changed, 51 insertions, 6 deletions
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go
index fb916d880..40dd71db1 100644
--- a/weed/mq/broker/broker_grpc_configure.go
+++ b/weed/mq/broker/broker_grpc_configure.go
@@ -6,11 +6,13 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
+ "github.com/seaweedfs/seaweedfs/weed/mq/schema"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "google.golang.org/protobuf/proto"
)
// ConfigureTopic Runs on any broker, but proxied to the balancer if not the balancer
@@ -28,8 +30,11 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
return resp, err
}
- // validate the schema
- if request.RecordType != nil {
+ // Validate flat schema format
+ if request.MessageRecordType != nil && len(request.KeyColumns) > 0 {
+ if err := schema.ValidateKeyColumns(request.MessageRecordType, request.KeyColumns); err != nil {
+ return nil, status.Errorf(codes.InvalidArgument, "invalid key columns: %v", err)
+ }
}
t := topic.FromPbTopic(request.Topic)
@@ -47,8 +52,36 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
}
if readErr == nil && assignErr == nil && len(resp.BrokerPartitionAssignments) == int(request.PartitionCount) {
- glog.V(0).Infof("existing topic partitions %d: %+v", len(resp.BrokerPartitionAssignments), resp.BrokerPartitionAssignments)
- return
+ // Check if schema needs to be updated
+ schemaChanged := false
+
+ if request.MessageRecordType != nil && resp.MessageRecordType != nil {
+ if !proto.Equal(request.MessageRecordType, resp.MessageRecordType) {
+ schemaChanged = true
+ }
+ } else if request.MessageRecordType != nil || resp.MessageRecordType != nil {
+ schemaChanged = true
+ }
+
+ if !schemaChanged {
+ glog.V(0).Infof("existing topic partitions %d: %+v", len(resp.BrokerPartitionAssignments), resp.BrokerPartitionAssignments)
+ return resp, nil
+ }
+
+ // Update schema in existing configuration
+ resp.MessageRecordType = request.MessageRecordType
+ resp.KeyColumns = request.KeyColumns
+ resp.SchemaFormat = request.SchemaFormat
+
+ if err := b.fca.SaveTopicConfToFiler(t, resp); err != nil {
+ return nil, fmt.Errorf("update topic schemas: %w", err)
+ }
+
+ // Invalidate TopicExists cache since we just updated the topic
+ b.invalidateTopicExistsCache(t)
+
+ glog.V(0).Infof("updated schemas for topic %s", request.Topic)
+ return resp, nil
}
if resp != nil && len(resp.BrokerPartitionAssignments) > 0 {
@@ -61,7 +94,10 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
return nil, status.Errorf(codes.Unavailable, "no broker available: %v", pub_balancer.ErrNoBroker)
}
resp.BrokerPartitionAssignments = pub_balancer.AllocateTopicPartitions(b.PubBalancer.Brokers, request.PartitionCount)
- resp.RecordType = request.RecordType
+ // Set flat schema format
+ resp.MessageRecordType = request.MessageRecordType
+ resp.KeyColumns = request.KeyColumns
+ resp.SchemaFormat = request.SchemaFormat
resp.Retention = request.Retention
// save the topic configuration on filer
@@ -69,9 +105,18 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
return nil, fmt.Errorf("configure topic: %w", err)
}
+ // Invalidate TopicExists cache since we just created/updated the topic
+ b.invalidateTopicExistsCache(t)
+
b.PubBalancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments)
+ // Actually assign the new partitions to brokers and add to localTopicManager
+ if assignErr := b.assignTopicPartitionsToBrokers(ctx, request.Topic, resp.BrokerPartitionAssignments, true); assignErr != nil {
+ glog.Errorf("assign topic %s partitions to brokers: %v", request.Topic, assignErr)
+ return nil, fmt.Errorf("assign topic partitions: %w", assignErr)
+ }
+
glog.V(0).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, resp.BrokerPartitionAssignments)
- return resp, err
+ return resp, nil
}