diff options
Diffstat (limited to 'weed/admin/dash/mq_management.go')
| -rw-r--r-- | weed/admin/dash/mq_management.go | 74 |
1 files changed, 70 insertions, 4 deletions
diff --git a/weed/admin/dash/mq_management.go b/weed/admin/dash/mq_management.go index 5e513af1e..3fd4aed85 100644 --- a/weed/admin/dash/mq_management.go +++ b/weed/admin/dash/mq_management.go @@ -181,7 +181,6 @@ func (s *AdminServer) GetTopicDetails(namespace, topicName string) (*TopicDetail Namespace: namespace, Name: topicName, Partitions: []PartitionInfo{}, - Schema: []SchemaFieldInfo{}, Publishers: []PublisherInfo{}, Subscribers: []TopicSubscriberInfo{}, ConsumerGroupOffsets: []ConsumerGroupOffsetInfo{}, @@ -214,9 +213,33 @@ func (s *AdminServer) GetTopicDetails(namespace, topicName string) (*TopicDetail } } - // Process schema from RecordType - if configResp.RecordType != nil { - topicDetails.Schema = convertRecordTypeToSchemaFields(configResp.RecordType) + // Process flat schema format + if configResp.MessageRecordType != nil { + for _, field := range configResp.MessageRecordType.Fields { + isKey := false + for _, keyCol := range configResp.KeyColumns { + if field.Name == keyCol { + isKey = true + break + } + } + + fieldType := "UNKNOWN" + if field.Type != nil && field.Type.Kind != nil { + fieldType = getFieldTypeName(field.Type) + } + + schemaField := SchemaFieldInfo{ + Name: field.Name, + Type: fieldType, + } + + if isKey { + topicDetails.KeySchema = append(topicDetails.KeySchema, schemaField) + } else { + topicDetails.ValueSchema = append(topicDetails.ValueSchema, schemaField) + } + } } // Get publishers information @@ -613,3 +636,46 @@ func convertTopicRetention(retention *mq_pb.TopicRetention) TopicRetentionInfo { DisplayUnit: displayUnit, } } + +// getFieldTypeName converts a schema_pb.Type to a human-readable type name +func getFieldTypeName(fieldType *schema_pb.Type) string { + if fieldType.Kind == nil { + return "UNKNOWN" + } + + switch kind := fieldType.Kind.(type) { + case *schema_pb.Type_ScalarType: + switch kind.ScalarType { + case schema_pb.ScalarType_BOOL: + return "BOOLEAN" + case schema_pb.ScalarType_INT32: + return "INT32" + case schema_pb.ScalarType_INT64: + return "INT64" + case schema_pb.ScalarType_FLOAT: + return "FLOAT" + case schema_pb.ScalarType_DOUBLE: + return "DOUBLE" + case schema_pb.ScalarType_BYTES: + return "BYTES" + case schema_pb.ScalarType_STRING: + return "STRING" + case schema_pb.ScalarType_TIMESTAMP: + return "TIMESTAMP" + case schema_pb.ScalarType_DATE: + return "DATE" + case schema_pb.ScalarType_TIME: + return "TIME" + case schema_pb.ScalarType_DECIMAL: + return "DECIMAL" + default: + return "SCALAR" + } + case *schema_pb.Type_ListType: + return "LIST" + case *schema_pb.Type_RecordType: + return "RECORD" + default: + return "UNKNOWN" + } +} |
