aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/dash
diff options
context:
space:
mode:
Diffstat (limited to 'weed/admin/dash')
-rw-r--r--weed/admin/dash/admin_server.go5
-rw-r--r--weed/admin/dash/mq_management.go74
-rw-r--r--weed/admin/dash/types.go3
-rw-r--r--weed/admin/dash/volume_management.go7
4 files changed, 82 insertions, 7 deletions
diff --git a/weed/admin/dash/admin_server.go b/weed/admin/dash/admin_server.go
index 3f135ee1b..4a1dd592f 100644
--- a/weed/admin/dash/admin_server.go
+++ b/weed/admin/dash/admin_server.go
@@ -1766,8 +1766,9 @@ func (s *AdminServer) UpdateTopicRetention(namespace, name string, enabled bool,
},
// Preserve existing partition count - this is critical!
PartitionCount: currentConfig.PartitionCount,
- // Preserve existing record type if it exists
- RecordType: currentConfig.RecordType,
+ // Preserve existing schema if it exists
+ MessageRecordType: currentConfig.MessageRecordType,
+ KeyColumns: currentConfig.KeyColumns,
}
// Update only the retention configuration
diff --git a/weed/admin/dash/mq_management.go b/weed/admin/dash/mq_management.go
index 5e513af1e..3fd4aed85 100644
--- a/weed/admin/dash/mq_management.go
+++ b/weed/admin/dash/mq_management.go
@@ -181,7 +181,6 @@ func (s *AdminServer) GetTopicDetails(namespace, topicName string) (*TopicDetail
Namespace: namespace,
Name: topicName,
Partitions: []PartitionInfo{},
- Schema: []SchemaFieldInfo{},
Publishers: []PublisherInfo{},
Subscribers: []TopicSubscriberInfo{},
ConsumerGroupOffsets: []ConsumerGroupOffsetInfo{},
@@ -214,9 +213,33 @@ func (s *AdminServer) GetTopicDetails(namespace, topicName string) (*TopicDetail
}
}
- // Process schema from RecordType
- if configResp.RecordType != nil {
- topicDetails.Schema = convertRecordTypeToSchemaFields(configResp.RecordType)
+ // Process flat schema format
+ if configResp.MessageRecordType != nil {
+ for _, field := range configResp.MessageRecordType.Fields {
+ isKey := false
+ for _, keyCol := range configResp.KeyColumns {
+ if field.Name == keyCol {
+ isKey = true
+ break
+ }
+ }
+
+ fieldType := "UNKNOWN"
+ if field.Type != nil && field.Type.Kind != nil {
+ fieldType = getFieldTypeName(field.Type)
+ }
+
+ schemaField := SchemaFieldInfo{
+ Name: field.Name,
+ Type: fieldType,
+ }
+
+ if isKey {
+ topicDetails.KeySchema = append(topicDetails.KeySchema, schemaField)
+ } else {
+ topicDetails.ValueSchema = append(topicDetails.ValueSchema, schemaField)
+ }
+ }
}
// Get publishers information
@@ -613,3 +636,46 @@ func convertTopicRetention(retention *mq_pb.TopicRetention) TopicRetentionInfo {
DisplayUnit: displayUnit,
}
}
+
+// getFieldTypeName converts a schema_pb.Type to a human-readable type name
+func getFieldTypeName(fieldType *schema_pb.Type) string {
+ if fieldType.Kind == nil {
+ return "UNKNOWN"
+ }
+
+ switch kind := fieldType.Kind.(type) {
+ case *schema_pb.Type_ScalarType:
+ switch kind.ScalarType {
+ case schema_pb.ScalarType_BOOL:
+ return "BOOLEAN"
+ case schema_pb.ScalarType_INT32:
+ return "INT32"
+ case schema_pb.ScalarType_INT64:
+ return "INT64"
+ case schema_pb.ScalarType_FLOAT:
+ return "FLOAT"
+ case schema_pb.ScalarType_DOUBLE:
+ return "DOUBLE"
+ case schema_pb.ScalarType_BYTES:
+ return "BYTES"
+ case schema_pb.ScalarType_STRING:
+ return "STRING"
+ case schema_pb.ScalarType_TIMESTAMP:
+ return "TIMESTAMP"
+ case schema_pb.ScalarType_DATE:
+ return "DATE"
+ case schema_pb.ScalarType_TIME:
+ return "TIME"
+ case schema_pb.ScalarType_DECIMAL:
+ return "DECIMAL"
+ default:
+ return "SCALAR"
+ }
+ case *schema_pb.Type_ListType:
+ return "LIST"
+ case *schema_pb.Type_RecordType:
+ return "RECORD"
+ default:
+ return "UNKNOWN"
+ }
+}
diff --git a/weed/admin/dash/types.go b/weed/admin/dash/types.go
index 18c46a48d..8b793bdbd 100644
--- a/weed/admin/dash/types.go
+++ b/weed/admin/dash/types.go
@@ -404,7 +404,8 @@ type TopicDetailsData struct {
Namespace string `json:"namespace"`
Name string `json:"name"`
Partitions []PartitionInfo `json:"partitions"`
- Schema []SchemaFieldInfo `json:"schema"`
+ KeySchema []SchemaFieldInfo `json:"key_schema"` // Schema fields for keys
+ ValueSchema []SchemaFieldInfo `json:"value_schema"` // Schema fields for values
Publishers []PublisherInfo `json:"publishers"`
Subscribers []TopicSubscriberInfo `json:"subscribers"`
ConsumerGroupOffsets []ConsumerGroupOffsetInfo `json:"consumer_group_offsets"`
diff --git a/weed/admin/dash/volume_management.go b/weed/admin/dash/volume_management.go
index 38b1257a4..c0be958a9 100644
--- a/weed/admin/dash/volume_management.go
+++ b/weed/admin/dash/volume_management.go
@@ -3,6 +3,7 @@ package dash
import (
"context"
"fmt"
+ "math"
"sort"
"time"
@@ -392,8 +393,14 @@ func (s *AdminServer) GetVolumeDetails(volumeID int, server string) (*VolumeDeta
// VacuumVolume performs a vacuum operation on a specific volume
func (s *AdminServer) VacuumVolume(volumeID int, server string) error {
+ // Validate volumeID range before converting to uint32
+ if volumeID < 0 || uint64(volumeID) > math.MaxUint32 {
+ return fmt.Errorf("volume ID out of range: %d", volumeID)
+ }
return s.WithMasterClient(func(client master_pb.SeaweedClient) error {
_, err := client.VacuumVolume(context.Background(), &master_pb.VacuumVolumeRequest{
+ // lgtm[go/incorrect-integer-conversion]
+ // Safe conversion: volumeID has been validated to be in range [0, 0xFFFFFFFF] above
VolumeId: uint32(volumeID),
GarbageThreshold: 0.0001, // A very low threshold to ensure all garbage is collected
Collection: "", // Empty for all collections