aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/dash/mq_management.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/admin/dash/mq_management.go')
-rw-r--r--weed/admin/dash/mq_management.go74
1 files changed, 70 insertions, 4 deletions
diff --git a/weed/admin/dash/mq_management.go b/weed/admin/dash/mq_management.go
index 5e513af1e..3fd4aed85 100644
--- a/weed/admin/dash/mq_management.go
+++ b/weed/admin/dash/mq_management.go
@@ -181,7 +181,6 @@ func (s *AdminServer) GetTopicDetails(namespace, topicName string) (*TopicDetail
Namespace: namespace,
Name: topicName,
Partitions: []PartitionInfo{},
- Schema: []SchemaFieldInfo{},
Publishers: []PublisherInfo{},
Subscribers: []TopicSubscriberInfo{},
ConsumerGroupOffsets: []ConsumerGroupOffsetInfo{},
@@ -214,9 +213,33 @@ func (s *AdminServer) GetTopicDetails(namespace, topicName string) (*TopicDetail
}
}
- // Process schema from RecordType
- if configResp.RecordType != nil {
- topicDetails.Schema = convertRecordTypeToSchemaFields(configResp.RecordType)
+ // Process flat schema format
+ if configResp.MessageRecordType != nil {
+ for _, field := range configResp.MessageRecordType.Fields {
+ isKey := false
+ for _, keyCol := range configResp.KeyColumns {
+ if field.Name == keyCol {
+ isKey = true
+ break
+ }
+ }
+
+ fieldType := "UNKNOWN"
+ if field.Type != nil && field.Type.Kind != nil {
+ fieldType = getFieldTypeName(field.Type)
+ }
+
+ schemaField := SchemaFieldInfo{
+ Name: field.Name,
+ Type: fieldType,
+ }
+
+ if isKey {
+ topicDetails.KeySchema = append(topicDetails.KeySchema, schemaField)
+ } else {
+ topicDetails.ValueSchema = append(topicDetails.ValueSchema, schemaField)
+ }
+ }
}
// Get publishers information
@@ -613,3 +636,46 @@ func convertTopicRetention(retention *mq_pb.TopicRetention) TopicRetentionInfo {
DisplayUnit: displayUnit,
}
}
+
+// getFieldTypeName converts a schema_pb.Type to a human-readable type name
+func getFieldTypeName(fieldType *schema_pb.Type) string {
+ if fieldType.Kind == nil {
+ return "UNKNOWN"
+ }
+
+ switch kind := fieldType.Kind.(type) {
+ case *schema_pb.Type_ScalarType:
+ switch kind.ScalarType {
+ case schema_pb.ScalarType_BOOL:
+ return "BOOLEAN"
+ case schema_pb.ScalarType_INT32:
+ return "INT32"
+ case schema_pb.ScalarType_INT64:
+ return "INT64"
+ case schema_pb.ScalarType_FLOAT:
+ return "FLOAT"
+ case schema_pb.ScalarType_DOUBLE:
+ return "DOUBLE"
+ case schema_pb.ScalarType_BYTES:
+ return "BYTES"
+ case schema_pb.ScalarType_STRING:
+ return "STRING"
+ case schema_pb.ScalarType_TIMESTAMP:
+ return "TIMESTAMP"
+ case schema_pb.ScalarType_DATE:
+ return "DATE"
+ case schema_pb.ScalarType_TIME:
+ return "TIME"
+ case schema_pb.ScalarType_DECIMAL:
+ return "DECIMAL"
+ default:
+ return "SCALAR"
+ }
+ case *schema_pb.Type_ListType:
+ return "LIST"
+ case *schema_pb.Type_RecordType:
+ return "RECORD"
+ default:
+ return "UNKNOWN"
+ }
+}