diff options
Diffstat (limited to 'weed/admin/dash')
| -rw-r--r-- | weed/admin/dash/admin_server.go | 5 | ||||
| -rw-r--r-- | weed/admin/dash/mq_management.go | 74 | ||||
| -rw-r--r-- | weed/admin/dash/types.go | 3 | ||||
| -rw-r--r-- | weed/admin/dash/volume_management.go | 7 |
4 files changed, 82 insertions, 7 deletions
diff --git a/weed/admin/dash/admin_server.go b/weed/admin/dash/admin_server.go index 3f135ee1b..4a1dd592f 100644 --- a/weed/admin/dash/admin_server.go +++ b/weed/admin/dash/admin_server.go @@ -1766,8 +1766,9 @@ func (s *AdminServer) UpdateTopicRetention(namespace, name string, enabled bool, }, // Preserve existing partition count - this is critical! PartitionCount: currentConfig.PartitionCount, - // Preserve existing record type if it exists - RecordType: currentConfig.RecordType, + // Preserve existing schema if it exists + MessageRecordType: currentConfig.MessageRecordType, + KeyColumns: currentConfig.KeyColumns, } // Update only the retention configuration diff --git a/weed/admin/dash/mq_management.go b/weed/admin/dash/mq_management.go index 5e513af1e..3fd4aed85 100644 --- a/weed/admin/dash/mq_management.go +++ b/weed/admin/dash/mq_management.go @@ -181,7 +181,6 @@ func (s *AdminServer) GetTopicDetails(namespace, topicName string) (*TopicDetail Namespace: namespace, Name: topicName, Partitions: []PartitionInfo{}, - Schema: []SchemaFieldInfo{}, Publishers: []PublisherInfo{}, Subscribers: []TopicSubscriberInfo{}, ConsumerGroupOffsets: []ConsumerGroupOffsetInfo{}, @@ -214,9 +213,33 @@ func (s *AdminServer) GetTopicDetails(namespace, topicName string) (*TopicDetail } } - // Process schema from RecordType - if configResp.RecordType != nil { - topicDetails.Schema = convertRecordTypeToSchemaFields(configResp.RecordType) + // Process flat schema format + if configResp.MessageRecordType != nil { + for _, field := range configResp.MessageRecordType.Fields { + isKey := false + for _, keyCol := range configResp.KeyColumns { + if field.Name == keyCol { + isKey = true + break + } + } + + fieldType := "UNKNOWN" + if field.Type != nil && field.Type.Kind != nil { + fieldType = getFieldTypeName(field.Type) + } + + schemaField := SchemaFieldInfo{ + Name: field.Name, + Type: fieldType, + } + + if isKey { + topicDetails.KeySchema = append(topicDetails.KeySchema, schemaField) + } else { + topicDetails.ValueSchema = append(topicDetails.ValueSchema, schemaField) + } + } } // Get publishers information @@ -613,3 +636,46 @@ func convertTopicRetention(retention *mq_pb.TopicRetention) TopicRetentionInfo { DisplayUnit: displayUnit, } } + +// getFieldTypeName converts a schema_pb.Type to a human-readable type name +func getFieldTypeName(fieldType *schema_pb.Type) string { + if fieldType.Kind == nil { + return "UNKNOWN" + } + + switch kind := fieldType.Kind.(type) { + case *schema_pb.Type_ScalarType: + switch kind.ScalarType { + case schema_pb.ScalarType_BOOL: + return "BOOLEAN" + case schema_pb.ScalarType_INT32: + return "INT32" + case schema_pb.ScalarType_INT64: + return "INT64" + case schema_pb.ScalarType_FLOAT: + return "FLOAT" + case schema_pb.ScalarType_DOUBLE: + return "DOUBLE" + case schema_pb.ScalarType_BYTES: + return "BYTES" + case schema_pb.ScalarType_STRING: + return "STRING" + case schema_pb.ScalarType_TIMESTAMP: + return "TIMESTAMP" + case schema_pb.ScalarType_DATE: + return "DATE" + case schema_pb.ScalarType_TIME: + return "TIME" + case schema_pb.ScalarType_DECIMAL: + return "DECIMAL" + default: + return "SCALAR" + } + case *schema_pb.Type_ListType: + return "LIST" + case *schema_pb.Type_RecordType: + return "RECORD" + default: + return "UNKNOWN" + } +} diff --git a/weed/admin/dash/types.go b/weed/admin/dash/types.go index 18c46a48d..8b793bdbd 100644 --- a/weed/admin/dash/types.go +++ b/weed/admin/dash/types.go @@ -404,7 +404,8 @@ type TopicDetailsData struct { Namespace string `json:"namespace"` Name string `json:"name"` Partitions []PartitionInfo `json:"partitions"` - Schema []SchemaFieldInfo `json:"schema"` + KeySchema []SchemaFieldInfo `json:"key_schema"` // Schema fields for keys + ValueSchema []SchemaFieldInfo `json:"value_schema"` // Schema fields for values Publishers []PublisherInfo `json:"publishers"` Subscribers []TopicSubscriberInfo `json:"subscribers"` ConsumerGroupOffsets []ConsumerGroupOffsetInfo `json:"consumer_group_offsets"` diff --git a/weed/admin/dash/volume_management.go b/weed/admin/dash/volume_management.go index 38b1257a4..c0be958a9 100644 --- a/weed/admin/dash/volume_management.go +++ b/weed/admin/dash/volume_management.go @@ -3,6 +3,7 @@ package dash import ( "context" "fmt" + "math" "sort" "time" @@ -392,8 +393,14 @@ func (s *AdminServer) GetVolumeDetails(volumeID int, server string) (*VolumeDeta // VacuumVolume performs a vacuum operation on a specific volume func (s *AdminServer) VacuumVolume(volumeID int, server string) error { + // Validate volumeID range before converting to uint32 + if volumeID < 0 || uint64(volumeID) > math.MaxUint32 { + return fmt.Errorf("volume ID out of range: %d", volumeID) + } return s.WithMasterClient(func(client master_pb.SeaweedClient) error { _, err := client.VacuumVolume(context.Background(), &master_pb.VacuumVolumeRequest{ + // lgtm[go/incorrect-integer-conversion] + // Safe conversion: volumeID has been validated to be in range [0, 0xFFFFFFFF] above VolumeId: uint32(volumeID), GarbageThreshold: 0.0001, // A very low threshold to ensure all garbage is collected Collection: "", // Empty for all collections |
