diff options
Diffstat (limited to 'weed/query/engine/catalog.go')
| -rw-r--r-- | weed/query/engine/catalog.go | 40 |
1 files changed, 36 insertions, 4 deletions
diff --git a/weed/query/engine/catalog.go b/weed/query/engine/catalog.go index 4cd39f3f0..f53e4cb2a 100644 --- a/weed/query/engine/catalog.go +++ b/weed/query/engine/catalog.go @@ -17,9 +17,9 @@ import ( type BrokerClientInterface interface { ListNamespaces(ctx context.Context) ([]string, error) ListTopics(ctx context.Context, namespace string) ([]string, error) - GetTopicSchema(ctx context.Context, namespace, topic string) (*schema_pb.RecordType, error) + GetTopicSchema(ctx context.Context, namespace, topic string) (*schema_pb.RecordType, []string, string, error) // Returns (flatSchema, keyColumns, schemaFormat, error) + ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, flatSchema *schema_pb.RecordType, keyColumns []string) error GetFilerClient() (filer_pb.FilerClient, error) - ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, recordType *schema_pb.RecordType) error DeleteTopic(ctx context.Context, namespace, topicName string) error // GetUnflushedMessages returns only messages that haven't been flushed to disk yet // This prevents double-counting when combining with disk-based data @@ -151,12 +151,24 @@ func (c *SchemaCatalog) ListTables(database string) ([]string, error) { tables := make([]string, 0, len(db.Tables)) for name := range db.Tables { + // Skip .meta table + if name == ".meta" { + continue + } tables = append(tables, name) } return tables, nil } - return topics, nil + // Filter out .meta table from topics + filtered := make([]string, 0, len(topics)) + for _, topic := range topics { + if topic != ".meta" { + filtered = append(filtered, topic) + } + } + + return filtered, nil } // GetTableInfo returns detailed schema information for a table @@ -185,7 +197,7 @@ func (c *SchemaCatalog) GetTableInfo(database, table string) (*TableInfo, error) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - recordType, err := c.brokerClient.GetTopicSchema(ctx, database, table) + recordType, _, _, err := c.brokerClient.GetTopicSchema(ctx, database, table) if err != nil { // If broker unavailable and we have expired cached data, return it if exists { @@ -278,7 +290,27 @@ func (c *SchemaCatalog) RegisterTopic(namespace, topicName string, mqSchema *sch // 1. MQ scalar types map directly to SQL types // 2. Complex types (arrays, maps) are serialized as JSON strings // 3. All fields are nullable unless specifically marked otherwise +// 4. If no schema is defined, create a default schema with system fields and _value func (c *SchemaCatalog) convertMQSchemaToTableInfo(namespace, topicName string, mqSchema *schema.Schema) (*TableInfo, error) { + // Check if the schema has a valid RecordType + if mqSchema == nil || mqSchema.RecordType == nil { + // For topics without schema, create a default schema with system fields and _value + columns := []ColumnInfo{ + {Name: SW_DISPLAY_NAME_TIMESTAMP, Type: "TIMESTAMP", Nullable: true}, + {Name: SW_COLUMN_NAME_KEY, Type: "VARBINARY", Nullable: true}, + {Name: SW_COLUMN_NAME_SOURCE, Type: "VARCHAR(255)", Nullable: true}, + {Name: SW_COLUMN_NAME_VALUE, Type: "VARBINARY", Nullable: true}, + } + + return &TableInfo{ + Name: topicName, + Namespace: namespace, + Schema: nil, // No schema defined + Columns: columns, + RevisionId: 0, + }, nil + } + columns := make([]ColumnInfo, len(mqSchema.RecordType.Fields)) for i, field := range mqSchema.RecordType.Fields { |
